rpc: fix missing logger context vars in control connection handlers

use ctxInterceptor in gRPC interceptors
also panic if the unimplemented stream interceptor is used
This commit is contained in:
Christian Schwarz 2019-12-22 16:31:33 +01:00
parent 6ebd9f1037
commit 5b52e5e331
4 changed files with 14 additions and 6 deletions

View File

@ -99,7 +99,9 @@ func (*transportCredentials) OverrideServerName(string) error {
panic("not implemented") panic("not implemented")
} }
func NewInterceptors(logger Logger, clientIdentityKey interface{}) (unary grpc.UnaryServerInterceptor, stream grpc.StreamServerInterceptor) { type ContextInterceptor = func(ctx context.Context) context.Context
func NewInterceptors(logger Logger, clientIdentityKey interface{}, ctxInterceptor ContextInterceptor) (unary grpc.UnaryServerInterceptor, stream grpc.StreamServerInterceptor) {
unary = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { unary = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
logger.WithField("fullMethod", info.FullMethod).Debug("request") logger.WithField("fullMethod", info.FullMethod).Debug("request")
p, ok := peer.FromContext(ctx) p, ok := peer.FromContext(ctx)
@ -113,8 +115,13 @@ func NewInterceptors(logger Logger, clientIdentityKey interface{}) (unary grpc.U
} }
logger.WithField("peer_client_identity", a.clientIdentity).Debug("peer client identity") logger.WithField("peer_client_identity", a.clientIdentity).Debug("peer client identity")
ctx = context.WithValue(ctx, clientIdentityKey, a.clientIdentity) ctx = context.WithValue(ctx, clientIdentityKey, a.clientIdentity)
if ctxInterceptor != nil {
ctx = ctxInterceptor(ctx)
}
return handler(ctx, req) return handler(ctx, req)
} }
stream = nil stream = func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
panic("unimplemented")
}
return return
} }

View File

@ -92,7 +92,7 @@ func server() {
onErr(err, "cannot listen") onErr(err, "cannot listen")
} }
srv, serve := grpchelper.NewServer(authListener, clientIdentityKey, log) srv, serve := grpchelper.NewServer(authListener, clientIdentityKey, log, nil)
svc := &greeter{"hello "} svc := &greeter{"hello "}
pdu.RegisterGreeterServer(srv, svc) pdu.RegisterGreeterServer(srv, svc)

View File

@ -50,7 +50,7 @@ func ClientConn(cn transport.Connecter, log Logger) *grpc.ClientConn {
} }
// NewServer is a convenience interface around the TransportCredentials and Interceptors interface. // NewServer is a convenience interface around the TransportCredentials and Interceptors interface.
func NewServer(authListener transport.AuthenticatedListener, clientIdentityKey interface{}, logger grpcclientidentity.Logger) (srv *grpc.Server, serve func() error) { func NewServer(authListener transport.AuthenticatedListener, clientIdentityKey interface{}, logger grpcclientidentity.Logger, ctxInterceptor grpcclientidentity.ContextInterceptor) (srv *grpc.Server, serve func() error) {
ka := grpc.KeepaliveParams(keepalive.ServerParameters{ ka := grpc.KeepaliveParams(keepalive.ServerParameters{
Time: StartKeepalivesAfterInactivityDuration, Time: StartKeepalivesAfterInactivityDuration,
Timeout: KeepalivePeerTimeout, Timeout: KeepalivePeerTimeout,
@ -60,7 +60,7 @@ func NewServer(authListener transport.AuthenticatedListener, clientIdentityKey i
PermitWithoutStream: true, PermitWithoutStream: true,
}) })
tcs := grpcclientidentity.NewTransportCredentials(logger) tcs := grpcclientidentity.NewTransportCredentials(logger)
unary, stream := grpcclientidentity.NewInterceptors(logger, clientIdentityKey) unary, stream := grpcclientidentity.NewInterceptors(logger, clientIdentityKey, ctxInterceptor)
srv = grpc.NewServer(grpc.Creds(tcs), grpc.UnaryInterceptor(unary), grpc.StreamInterceptor(stream), ka, ep) srv = grpc.NewServer(grpc.Creds(tcs), grpc.UnaryInterceptor(unary), grpc.StreamInterceptor(stream), ka, ep)
serve = func() error { serve = func() error {

View File

@ -35,9 +35,10 @@ type HandlerContextInterceptor func(ctx context.Context) context.Context
// config must be valid (use its Validate function). // config must be valid (use its Validate function).
func NewServer(handler Handler, loggers Loggers, ctxInterceptor HandlerContextInterceptor) *Server { func NewServer(handler Handler, loggers Loggers, ctxInterceptor HandlerContextInterceptor) *Server {
// setup control server
controlServerServe := func(ctx context.Context, controlListener transport.AuthenticatedListener, errOut chan<- error) { controlServerServe := func(ctx context.Context, controlListener transport.AuthenticatedListener, errOut chan<- error) {
controlServer, serve := grpchelper.NewServer(controlListener, endpoint.ClientIdentityKey, loggers.Control) controlServer, serve := grpchelper.NewServer(controlListener, endpoint.ClientIdentityKey, loggers.Control, ctxInterceptor)
pdu.RegisterReplicationServer(controlServer, handler) pdu.RegisterReplicationServer(controlServer, handler)
// give time for graceful stop until deadline expires, then hard stop // give time for graceful stop until deadline expires, then hard stop