diff --git a/rpc/rpc_client.go b/rpc/rpc_client.go index 018ee3f..818cd44 100644 --- a/rpc/rpc_client.go +++ b/rpc/rpc_client.go @@ -29,6 +29,7 @@ type Client struct { controlClient pdu.ReplicationClient // this the grpc client instance, see constructor controlConn *grpc.ClientConn loggers Loggers + closed chan struct{} } var _ logic.Endpoint = &Client{} @@ -46,14 +47,21 @@ func NewClient(cn transport.Connecter, loggers Loggers) *Client { c := &Client{ loggers: loggers, + closed: make(chan struct{}), } grpcConn := grpchelper.ClientConn(muxedConnecter.control, loggers.Control) go func() { - for { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-c.closed + cancel() + }() + defer cancel() + for ctx.Err() == nil { state := grpcConn.GetState() loggers.General.WithField("grpc_state", state.String()).Debug("grpc state change") - grpcConn.WaitForStateChange(context.TODO(), state) + grpcConn.WaitForStateChange(ctx, state) } }() c.controlClient = pdu.NewReplicationClient(grpcConn) @@ -64,8 +72,9 @@ func NewClient(cn transport.Connecter, loggers Loggers) *Client { } func (c *Client) Close() { + close(c.closed) if err := c.controlConn.Close(); err != nil { - c.loggers.General.WithError(err).Error("cannot cloe control connection") + c.loggers.General.WithError(err).Error("cannot close control connection") } // TODO c.dataClient should have Close() }