From 2c3b3c093d7338ae65ff04a473007a3cd4d0d42a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Mar 2019 16:16:56 +0100 Subject: [PATCH] rpc: do not leak grpc state change logger goroutine --- rpc/rpc_client.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/rpc/rpc_client.go b/rpc/rpc_client.go index 018ee3f..818cd44 100644 --- a/rpc/rpc_client.go +++ b/rpc/rpc_client.go @@ -29,6 +29,7 @@ type Client struct { controlClient pdu.ReplicationClient // this the grpc client instance, see constructor controlConn *grpc.ClientConn loggers Loggers + closed chan struct{} } var _ logic.Endpoint = &Client{} @@ -46,14 +47,21 @@ func NewClient(cn transport.Connecter, loggers Loggers) *Client { c := &Client{ loggers: loggers, + closed: make(chan struct{}), } grpcConn := grpchelper.ClientConn(muxedConnecter.control, loggers.Control) go func() { - for { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-c.closed + cancel() + }() + defer cancel() + for ctx.Err() == nil { state := grpcConn.GetState() loggers.General.WithField("grpc_state", state.String()).Debug("grpc state change") - grpcConn.WaitForStateChange(context.TODO(), state) + grpcConn.WaitForStateChange(ctx, state) } }() c.controlClient = pdu.NewReplicationClient(grpcConn) @@ -64,8 +72,9 @@ func NewClient(cn transport.Connecter, loggers Loggers) *Client { } func (c *Client) Close() { + close(c.closed) if err := c.controlConn.Close(); err != nil { - c.loggers.General.WithError(err).Error("cannot cloe control connection") + c.loggers.General.WithError(err).Error("cannot close control connection") } // TODO c.dataClient should have Close() }