zrepl/rpc/grpcclientidentity/example/main.go

115 lines
2.6 KiB
Go
Raw Normal View History

// This package demonstrates how the grpcclientidentity package can be used
// to set up a gRPC greeter service.
package main
import (
"context"
"flag"
"fmt"
"os"
"time"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/rpc/grpcclientidentity/example/pdu"
"github.com/zrepl/zrepl/rpc/grpcclientidentity/grpchelper"
"github.com/zrepl/zrepl/transport/tcp"
)
var args struct {
mode string
}
var log = logger.NewStderrDebugLogger()
func main() {
flag.StringVar(&args.mode, "mode", "", "client|server")
flag.Parse()
switch args.mode {
case "client":
client()
case "server":
server()
default:
log.Printf("unknown mode %q")
os.Exit(1)
}
}
func onErr(err error, format string, args ...interface{}) {
log.WithError(err).Error(fmt.Sprintf("%s: %s", fmt.Sprintf(format, args...), err))
os.Exit(1)
}
func client() {
cn, err := tcp.TCPConnecterFromConfig(&config.TCPConnect{
ConnectCommon: config.ConnectCommon{
Type: "tcp",
},
Address: "127.0.0.1:8080",
DialTimeout: 10 * time.Second,
})
if err != nil {
onErr(err, "build connecter error")
}
clientConn := grpchelper.ClientConn(cn, log)
defer clientConn.Close()
// normal usage from here on
client := pdu.NewGreeterClient(clientConn)
resp, err := client.Greet(context.Background(), &pdu.GreetRequest{Name: "somethingimadeup"})
if err != nil {
onErr(err, "RPC error")
}
fmt.Printf("got response:\n\t%s\n", resp.GetMsg())
}
const clientIdentityKey = "clientIdentity"
func server() {
authListenerFactory, err := tcp.TCPListenerFactoryFromConfig(nil, &config.TCPServe{
ServeCommon: config.ServeCommon{
Type: "tcp",
},
Listen: "127.0.0.1:8080",
Clients: map[string]string{
"127.0.0.1": "localclient",
"::1": "localclient",
},
})
if err != nil {
onErr(err, "cannot build listener factory")
}
log := logger.NewStderrDebugLogger()
rpc: use grpchelper package, add grpc.KeepaliveEnforcementPolicy, fix 'transport is closing' error Symptom: zrepl log message: rpc error: code = Unavailable desc = transport is closing Underlying Problem: * rpc.NewServer was not using grpchelper.NewServer and not setting Server KeepaliveParams by itself * and even grpchelper.NewServer didn't set a KeepaliveEnforcementPolicy * However, KeepaliveEnforcementPolicy is necessary if the client keepalive is configured with non-default values * .. which grpchelper.ClientConn does, and that method is used by rpc.NewClient * rpc.Client was sending pings * lacking server-side KeepaliveEnforcementPolicy caused grpc-hard-coded `pingStrikes` counter to go past limit of 2: https://github.com/grpc/grpc-go/blob/021bd5734e43b1b11073ea326de29af4de3dfa9b/internal/transport/http2_server.go#L726 How was this debugged: * GRPC_GO_LOG_VERBOSITY_LEVEL=99 GRPC_GO_LOG_SEVERITY_LEVEL=info PATH=/root/mockpath:$PATH zrepl daemon * with a patch on grpc package to get more log messages on pingStrikes increases: diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 8b04b039..f68f55ea 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -214,6 +214,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err if kep.MinTime == 0 { kep.MinTime = defaultKeepalivePolicyMinTime } + errorf("effective keepalive enforcement policy: %#v", kep) done := make(chan struct{}) t := &http2Server{ ctx: context.Background(), @@ -696,6 +697,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) { t.controlBuf.put(pingAck) now := time.Now() + errorf("transport:ping handlePing, last ping %s ago", now.Sub(t.lastPingAt)) defer func() { t.lastPingAt = now }() @@ -713,11 +715,13 @@ func (t *http2Server) handlePing(f *http2.PingFrame) { // Keepalive shouldn't be active thus, this new ping should // have come after at least defaultPingTimeout. if t.lastPingAt.Add(defaultPingTimeout).After(now) { + errorf("transport:ping strike ns < 1 && !t.kep.PermitWithoutStream") t.pingStrikes++ } } else { // Check if keepalive policy is respected. if t.lastPingAt.Add(t.kep.MinTime).After(now) { + errorf("transport:ping strike !(ns < 1 && !t.kep.PermitWithoutStream) kep.MinTime=%s ns=%d", t.kep.MinTime, ns) t.pingStrikes++ } } fixes #181
2020-01-04 21:00:00 +01:00
authListener, err := authListenerFactory()
if err != nil {
rpc: use grpchelper package, add grpc.KeepaliveEnforcementPolicy, fix 'transport is closing' error Symptom: zrepl log message: rpc error: code = Unavailable desc = transport is closing Underlying Problem: * rpc.NewServer was not using grpchelper.NewServer and not setting Server KeepaliveParams by itself * and even grpchelper.NewServer didn't set a KeepaliveEnforcementPolicy * However, KeepaliveEnforcementPolicy is necessary if the client keepalive is configured with non-default values * .. which grpchelper.ClientConn does, and that method is used by rpc.NewClient * rpc.Client was sending pings * lacking server-side KeepaliveEnforcementPolicy caused grpc-hard-coded `pingStrikes` counter to go past limit of 2: https://github.com/grpc/grpc-go/blob/021bd5734e43b1b11073ea326de29af4de3dfa9b/internal/transport/http2_server.go#L726 How was this debugged: * GRPC_GO_LOG_VERBOSITY_LEVEL=99 GRPC_GO_LOG_SEVERITY_LEVEL=info PATH=/root/mockpath:$PATH zrepl daemon * with a patch on grpc package to get more log messages on pingStrikes increases: diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 8b04b039..f68f55ea 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -214,6 +214,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err if kep.MinTime == 0 { kep.MinTime = defaultKeepalivePolicyMinTime } + errorf("effective keepalive enforcement policy: %#v", kep) done := make(chan struct{}) t := &http2Server{ ctx: context.Background(), @@ -696,6 +697,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) { t.controlBuf.put(pingAck) now := time.Now() + errorf("transport:ping handlePing, last ping %s ago", now.Sub(t.lastPingAt)) defer func() { t.lastPingAt = now }() @@ -713,11 +715,13 @@ func (t *http2Server) handlePing(f *http2.PingFrame) { // Keepalive shouldn't be active thus, this new ping should // have come after at least defaultPingTimeout. if t.lastPingAt.Add(defaultPingTimeout).After(now) { + errorf("transport:ping strike ns < 1 && !t.kep.PermitWithoutStream") t.pingStrikes++ } } else { // Check if keepalive policy is respected. if t.lastPingAt.Add(t.kep.MinTime).After(now) { + errorf("transport:ping strike !(ns < 1 && !t.kep.PermitWithoutStream) kep.MinTime=%s ns=%d", t.kep.MinTime, ns) t.pingStrikes++ } } fixes #181
2020-01-04 21:00:00 +01:00
onErr(err, "cannot listen")
}
rpc: use grpchelper package, add grpc.KeepaliveEnforcementPolicy, fix 'transport is closing' error Symptom: zrepl log message: rpc error: code = Unavailable desc = transport is closing Underlying Problem: * rpc.NewServer was not using grpchelper.NewServer and not setting Server KeepaliveParams by itself * and even grpchelper.NewServer didn't set a KeepaliveEnforcementPolicy * However, KeepaliveEnforcementPolicy is necessary if the client keepalive is configured with non-default values * .. which grpchelper.ClientConn does, and that method is used by rpc.NewClient * rpc.Client was sending pings * lacking server-side KeepaliveEnforcementPolicy caused grpc-hard-coded `pingStrikes` counter to go past limit of 2: https://github.com/grpc/grpc-go/blob/021bd5734e43b1b11073ea326de29af4de3dfa9b/internal/transport/http2_server.go#L726 How was this debugged: * GRPC_GO_LOG_VERBOSITY_LEVEL=99 GRPC_GO_LOG_SEVERITY_LEVEL=info PATH=/root/mockpath:$PATH zrepl daemon * with a patch on grpc package to get more log messages on pingStrikes increases: diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 8b04b039..f68f55ea 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -214,6 +214,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err if kep.MinTime == 0 { kep.MinTime = defaultKeepalivePolicyMinTime } + errorf("effective keepalive enforcement policy: %#v", kep) done := make(chan struct{}) t := &http2Server{ ctx: context.Background(), @@ -696,6 +697,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) { t.controlBuf.put(pingAck) now := time.Now() + errorf("transport:ping handlePing, last ping %s ago", now.Sub(t.lastPingAt)) defer func() { t.lastPingAt = now }() @@ -713,11 +715,13 @@ func (t *http2Server) handlePing(f *http2.PingFrame) { // Keepalive shouldn't be active thus, this new ping should // have come after at least defaultPingTimeout. if t.lastPingAt.Add(defaultPingTimeout).After(now) { + errorf("transport:ping strike ns < 1 && !t.kep.PermitWithoutStream") t.pingStrikes++ } } else { // Check if keepalive policy is respected. if t.lastPingAt.Add(t.kep.MinTime).After(now) { + errorf("transport:ping strike !(ns < 1 && !t.kep.PermitWithoutStream) kep.MinTime=%s ns=%d", t.kep.MinTime, ns) t.pingStrikes++ } } fixes #181
2020-01-04 21:00:00 +01:00
srv, serve := grpchelper.NewServer(authListener, clientIdentityKey, log, nil)
rpc: use grpchelper package, add grpc.KeepaliveEnforcementPolicy, fix 'transport is closing' error Symptom: zrepl log message: rpc error: code = Unavailable desc = transport is closing Underlying Problem: * rpc.NewServer was not using grpchelper.NewServer and not setting Server KeepaliveParams by itself * and even grpchelper.NewServer didn't set a KeepaliveEnforcementPolicy * However, KeepaliveEnforcementPolicy is necessary if the client keepalive is configured with non-default values * .. which grpchelper.ClientConn does, and that method is used by rpc.NewClient * rpc.Client was sending pings * lacking server-side KeepaliveEnforcementPolicy caused grpc-hard-coded `pingStrikes` counter to go past limit of 2: https://github.com/grpc/grpc-go/blob/021bd5734e43b1b11073ea326de29af4de3dfa9b/internal/transport/http2_server.go#L726 How was this debugged: * GRPC_GO_LOG_VERBOSITY_LEVEL=99 GRPC_GO_LOG_SEVERITY_LEVEL=info PATH=/root/mockpath:$PATH zrepl daemon * with a patch on grpc package to get more log messages on pingStrikes increases: diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 8b04b039..f68f55ea 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -214,6 +214,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err if kep.MinTime == 0 { kep.MinTime = defaultKeepalivePolicyMinTime } + errorf("effective keepalive enforcement policy: %#v", kep) done := make(chan struct{}) t := &http2Server{ ctx: context.Background(), @@ -696,6 +697,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) { t.controlBuf.put(pingAck) now := time.Now() + errorf("transport:ping handlePing, last ping %s ago", now.Sub(t.lastPingAt)) defer func() { t.lastPingAt = now }() @@ -713,11 +715,13 @@ func (t *http2Server) handlePing(f *http2.PingFrame) { // Keepalive shouldn't be active thus, this new ping should // have come after at least defaultPingTimeout. if t.lastPingAt.Add(defaultPingTimeout).After(now) { + errorf("transport:ping strike ns < 1 && !t.kep.PermitWithoutStream") t.pingStrikes++ } } else { // Check if keepalive policy is respected. if t.lastPingAt.Add(t.kep.MinTime).After(now) { + errorf("transport:ping strike !(ns < 1 && !t.kep.PermitWithoutStream) kep.MinTime=%s ns=%d", t.kep.MinTime, ns) t.pingStrikes++ } } fixes #181
2020-01-04 21:00:00 +01:00
svc := &greeter{prepend: "hello "}
pdu.RegisterGreeterServer(srv, svc)
if err := serve(); err != nil {
onErr(err, "error serving")
}
}
type greeter struct {
pdu.UnsafeGreeterServer
prepend string
}
func (g *greeter) Greet(ctx context.Context, r *pdu.GreetRequest) (*pdu.GreetResponse, error) {
ci, _ := ctx.Value(clientIdentityKey).(string)
log.WithField("clientIdentity", ci).Info("Greet() request") // show that we got the client identity
return &pdu.GreetResponse{Msg: fmt.Sprintf("%s%s (clientIdentity=%q)", g.prepend, r.GetName(), ci)}, nil
}