2018-12-11 22:01:50 +01:00
|
|
|
package rpc
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"time"
|
|
|
|
|
2024-10-18 19:21:17 +02:00
|
|
|
"github.com/zrepl/zrepl/internal/endpoint"
|
|
|
|
"github.com/zrepl/zrepl/internal/replication/logic/pdu"
|
|
|
|
"github.com/zrepl/zrepl/internal/rpc/dataconn"
|
|
|
|
"github.com/zrepl/zrepl/internal/rpc/grpcclientidentity"
|
|
|
|
"github.com/zrepl/zrepl/internal/rpc/grpcclientidentity/grpchelper"
|
|
|
|
"github.com/zrepl/zrepl/internal/rpc/versionhandshake"
|
|
|
|
"github.com/zrepl/zrepl/internal/transport"
|
|
|
|
"github.com/zrepl/zrepl/internal/util/envconst"
|
2018-12-11 22:01:50 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
type Handler interface {
|
|
|
|
pdu.ReplicationServer
|
|
|
|
dataconn.Handler
|
|
|
|
}
|
|
|
|
|
|
|
|
type serveFunc func(ctx context.Context, demuxedListener transport.AuthenticatedListener, errOut chan<- error)
|
|
|
|
|
|
|
|
// Server abstracts the accept and request routing infrastructure for the
|
|
|
|
// passive side of a replication setup.
|
|
|
|
type Server struct {
|
|
|
|
logger Logger
|
|
|
|
handler Handler
|
|
|
|
controlServerServe serveFunc
|
|
|
|
dataServer *dataconn.Server
|
|
|
|
dataServerServe serveFunc
|
|
|
|
}
|
|
|
|
|
2020-04-11 15:49:41 +02:00
|
|
|
type HandlerContextInterceptorData interface {
|
|
|
|
FullMethod() string
|
|
|
|
ClientIdentity() string
|
|
|
|
}
|
|
|
|
|
|
|
|
type interceptorData struct {
|
|
|
|
prefixMethod string
|
|
|
|
wrapped HandlerContextInterceptorData
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d interceptorData) ClientIdentity() string { return d.wrapped.ClientIdentity() }
|
|
|
|
func (d interceptorData) FullMethod() string { return d.prefixMethod + d.wrapped.FullMethod() }
|
|
|
|
|
|
|
|
type HandlerContextInterceptor func(ctx context.Context, data HandlerContextInterceptorData, handler func(ctx context.Context))
|
2018-12-11 22:01:50 +01:00
|
|
|
|
|
|
|
// config must be valid (use its Validate function).
|
|
|
|
func NewServer(handler Handler, loggers Loggers, ctxInterceptor HandlerContextInterceptor) *Server {
|
|
|
|
|
2019-12-22 16:31:33 +01:00
|
|
|
// setup control server
|
2018-12-11 22:01:50 +01:00
|
|
|
controlServerServe := func(ctx context.Context, controlListener transport.AuthenticatedListener, errOut chan<- error) {
|
rpc: use grpchelper package, add grpc.KeepaliveEnforcementPolicy, fix 'transport is closing' error
Symptom: zrepl log message:
rpc error: code = Unavailable desc = transport is closing
Underlying Problem:
* rpc.NewServer was not using grpchelper.NewServer and not setting Server KeepaliveParams by itself
* and even grpchelper.NewServer didn't set a KeepaliveEnforcementPolicy
* However, KeepaliveEnforcementPolicy is necessary if the client keepalive is configured with non-default values
* .. which grpchelper.ClientConn does, and that method is used by rpc.NewClient
* rpc.Client was sending pings
* lacking server-side KeepaliveEnforcementPolicy caused grpc-hard-coded `pingStrikes` counter to go past limit of 2:
https://github.com/grpc/grpc-go/blob/021bd5734e43b1b11073ea326de29af4de3dfa9b/internal/transport/http2_server.go#L726
How was this debugged:
* GRPC_GO_LOG_VERBOSITY_LEVEL=99 GRPC_GO_LOG_SEVERITY_LEVEL=info PATH=/root/mockpath:$PATH zrepl daemon
* with a patch on grpc package to get more log messages on pingStrikes increases:
diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go
index 8b04b039..f68f55ea 100644
--- a/internal/transport/http2_server.go
+++ b/internal/transport/http2_server.go
@@ -214,6 +214,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
+ errorf("effective keepalive enforcement policy: %#v", kep)
done := make(chan struct{})
t := &http2Server{
ctx: context.Background(),
@@ -696,6 +697,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
t.controlBuf.put(pingAck)
now := time.Now()
+ errorf("transport:ping handlePing, last ping %s ago", now.Sub(t.lastPingAt))
defer func() {
t.lastPingAt = now
}()
@@ -713,11 +715,13 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
// Keepalive shouldn't be active thus, this new ping should
// have come after at least defaultPingTimeout.
if t.lastPingAt.Add(defaultPingTimeout).After(now) {
+ errorf("transport:ping strike ns < 1 && !t.kep.PermitWithoutStream")
t.pingStrikes++
}
} else {
// Check if keepalive policy is respected.
if t.lastPingAt.Add(t.kep.MinTime).After(now) {
+ errorf("transport:ping strike !(ns < 1 && !t.kep.PermitWithoutStream) kep.MinTime=%s ns=%d", t.kep.MinTime, ns)
t.pingStrikes++
}
}
fixes #181
2020-01-04 21:00:00 +01:00
|
|
|
|
2020-04-11 15:49:41 +02:00
|
|
|
var controlCtxInterceptor grpcclientidentity.Interceptor = func(ctx context.Context, data grpcclientidentity.ContextInterceptorData, handler func(ctx context.Context)) {
|
|
|
|
ctxInterceptor(ctx, interceptorData{"control://", data}, handler)
|
|
|
|
}
|
|
|
|
controlServer, serve := grpchelper.NewServer(controlListener, endpoint.ClientIdentityKey, loggers.Control, controlCtxInterceptor)
|
rpc: use grpchelper package, add grpc.KeepaliveEnforcementPolicy, fix 'transport is closing' error
Symptom: zrepl log message:
rpc error: code = Unavailable desc = transport is closing
Underlying Problem:
* rpc.NewServer was not using grpchelper.NewServer and not setting Server KeepaliveParams by itself
* and even grpchelper.NewServer didn't set a KeepaliveEnforcementPolicy
* However, KeepaliveEnforcementPolicy is necessary if the client keepalive is configured with non-default values
* .. which grpchelper.ClientConn does, and that method is used by rpc.NewClient
* rpc.Client was sending pings
* lacking server-side KeepaliveEnforcementPolicy caused grpc-hard-coded `pingStrikes` counter to go past limit of 2:
https://github.com/grpc/grpc-go/blob/021bd5734e43b1b11073ea326de29af4de3dfa9b/internal/transport/http2_server.go#L726
How was this debugged:
* GRPC_GO_LOG_VERBOSITY_LEVEL=99 GRPC_GO_LOG_SEVERITY_LEVEL=info PATH=/root/mockpath:$PATH zrepl daemon
* with a patch on grpc package to get more log messages on pingStrikes increases:
diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go
index 8b04b039..f68f55ea 100644
--- a/internal/transport/http2_server.go
+++ b/internal/transport/http2_server.go
@@ -214,6 +214,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
+ errorf("effective keepalive enforcement policy: %#v", kep)
done := make(chan struct{})
t := &http2Server{
ctx: context.Background(),
@@ -696,6 +697,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
t.controlBuf.put(pingAck)
now := time.Now()
+ errorf("transport:ping handlePing, last ping %s ago", now.Sub(t.lastPingAt))
defer func() {
t.lastPingAt = now
}()
@@ -713,11 +715,13 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
// Keepalive shouldn't be active thus, this new ping should
// have come after at least defaultPingTimeout.
if t.lastPingAt.Add(defaultPingTimeout).After(now) {
+ errorf("transport:ping strike ns < 1 && !t.kep.PermitWithoutStream")
t.pingStrikes++
}
} else {
// Check if keepalive policy is respected.
if t.lastPingAt.Add(t.kep.MinTime).After(now) {
+ errorf("transport:ping strike !(ns < 1 && !t.kep.PermitWithoutStream) kep.MinTime=%s ns=%d", t.kep.MinTime, ns)
t.pingStrikes++
}
}
fixes #181
2020-01-04 21:00:00 +01:00
|
|
|
pdu.RegisterReplicationServer(controlServer, handler)
|
|
|
|
|
2018-12-11 22:01:50 +01:00
|
|
|
// give time for graceful stop until deadline expires, then hard stop
|
|
|
|
go func() {
|
|
|
|
<-ctx.Done()
|
|
|
|
if dl, ok := ctx.Deadline(); ok {
|
|
|
|
go time.AfterFunc(dl.Sub(dl), controlServer.Stop)
|
|
|
|
}
|
2020-04-20 21:24:42 +02:00
|
|
|
loggers.Control.Debug("gracefully shutting down control server")
|
2018-12-11 22:01:50 +01:00
|
|
|
controlServer.GracefulStop()
|
2020-04-20 21:24:42 +02:00
|
|
|
loggers.Control.Debug("gracdeful shut down of control server complete")
|
2018-12-11 22:01:50 +01:00
|
|
|
}()
|
|
|
|
|
rpc: use grpchelper package, add grpc.KeepaliveEnforcementPolicy, fix 'transport is closing' error
Symptom: zrepl log message:
rpc error: code = Unavailable desc = transport is closing
Underlying Problem:
* rpc.NewServer was not using grpchelper.NewServer and not setting Server KeepaliveParams by itself
* and even grpchelper.NewServer didn't set a KeepaliveEnforcementPolicy
* However, KeepaliveEnforcementPolicy is necessary if the client keepalive is configured with non-default values
* .. which grpchelper.ClientConn does, and that method is used by rpc.NewClient
* rpc.Client was sending pings
* lacking server-side KeepaliveEnforcementPolicy caused grpc-hard-coded `pingStrikes` counter to go past limit of 2:
https://github.com/grpc/grpc-go/blob/021bd5734e43b1b11073ea326de29af4de3dfa9b/internal/transport/http2_server.go#L726
How was this debugged:
* GRPC_GO_LOG_VERBOSITY_LEVEL=99 GRPC_GO_LOG_SEVERITY_LEVEL=info PATH=/root/mockpath:$PATH zrepl daemon
* with a patch on grpc package to get more log messages on pingStrikes increases:
diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go
index 8b04b039..f68f55ea 100644
--- a/internal/transport/http2_server.go
+++ b/internal/transport/http2_server.go
@@ -214,6 +214,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
+ errorf("effective keepalive enforcement policy: %#v", kep)
done := make(chan struct{})
t := &http2Server{
ctx: context.Background(),
@@ -696,6 +697,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
t.controlBuf.put(pingAck)
now := time.Now()
+ errorf("transport:ping handlePing, last ping %s ago", now.Sub(t.lastPingAt))
defer func() {
t.lastPingAt = now
}()
@@ -713,11 +715,13 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
// Keepalive shouldn't be active thus, this new ping should
// have come after at least defaultPingTimeout.
if t.lastPingAt.Add(defaultPingTimeout).After(now) {
+ errorf("transport:ping strike ns < 1 && !t.kep.PermitWithoutStream")
t.pingStrikes++
}
} else {
// Check if keepalive policy is respected.
if t.lastPingAt.Add(t.kep.MinTime).After(now) {
+ errorf("transport:ping strike !(ns < 1 && !t.kep.PermitWithoutStream) kep.MinTime=%s ns=%d", t.kep.MinTime, ns)
t.pingStrikes++
}
}
fixes #181
2020-01-04 21:00:00 +01:00
|
|
|
errOut <- serve()
|
2018-12-11 22:01:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// setup data server
|
|
|
|
dataServerClientIdentitySetter := func(ctx context.Context, wire *transport.AuthConn) (context.Context, *transport.AuthConn) {
|
|
|
|
ci := wire.ClientIdentity()
|
|
|
|
ctx = context.WithValue(ctx, endpoint.ClientIdentityKey, ci)
|
|
|
|
return ctx, wire
|
|
|
|
}
|
2020-04-11 15:49:41 +02:00
|
|
|
var dataCtxInterceptor dataconn.ContextInterceptor = func(ctx context.Context, data dataconn.ContextInterceptorData, handler func(ctx context.Context)) {
|
|
|
|
ctxInterceptor(ctx, interceptorData{"data://", data}, handler)
|
|
|
|
}
|
|
|
|
dataServer := dataconn.NewServer(dataServerClientIdentitySetter, dataCtxInterceptor, loggers.Data, handler)
|
2018-12-11 22:01:50 +01:00
|
|
|
dataServerServe := func(ctx context.Context, dataListener transport.AuthenticatedListener, errOut chan<- error) {
|
|
|
|
dataServer.Serve(ctx, dataListener)
|
|
|
|
errOut <- nil // TODO bad design of dataServer?
|
|
|
|
}
|
|
|
|
|
|
|
|
server := &Server{
|
|
|
|
logger: loggers.General,
|
|
|
|
handler: handler,
|
|
|
|
controlServerServe: controlServerServe,
|
|
|
|
dataServer: dataServer,
|
|
|
|
dataServerServe: dataServerServe,
|
|
|
|
}
|
|
|
|
|
|
|
|
return server
|
|
|
|
}
|
|
|
|
|
|
|
|
// The context is used for cancellation only.
|
|
|
|
// Serve never returns an error, it logs them to the Server's logger.
|
|
|
|
func (s *Server) Serve(ctx context.Context, l transport.AuthenticatedListener) {
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
2020-04-20 21:24:42 +02:00
|
|
|
defer cancel()
|
|
|
|
defer s.logger.Debug("rpc.(*Server).Serve done")
|
2018-12-11 22:01:50 +01:00
|
|
|
|
|
|
|
l = versionhandshake.Listener(l, envconst.Duration("ZREPL_RPC_SERVER_VERSIONHANDSHAKE_TIMEOUT", 10*time.Second))
|
|
|
|
|
|
|
|
// it is important that demux's context is cancelled,
|
|
|
|
// it has background goroutines attached
|
|
|
|
demuxListener := demux(ctx, l)
|
|
|
|
|
|
|
|
serveErrors := make(chan error, 2)
|
|
|
|
go s.controlServerServe(ctx, demuxListener.control, serveErrors)
|
|
|
|
go s.dataServerServe(ctx, demuxListener.data, serveErrors)
|
|
|
|
select {
|
|
|
|
case serveErr := <-serveErrors:
|
|
|
|
s.logger.WithError(serveErr).Error("serve error")
|
|
|
|
s.logger.Debug("wait for other server to shut down")
|
|
|
|
cancel()
|
|
|
|
secondServeErr := <-serveErrors
|
|
|
|
s.logger.WithError(secondServeErr).Error("serve error")
|
|
|
|
case <-ctx.Done():
|
|
|
|
s.logger.Debug("context cancelled, wait for control and data servers")
|
|
|
|
cancel()
|
|
|
|
for i := 0; i < 2; i++ {
|
|
|
|
<-serveErrors
|
|
|
|
}
|
|
|
|
s.logger.Debug("control and data server shut down, returning from Serve")
|
|
|
|
}
|
|
|
|
}
|