From 051fd3a4d75b573e5db1e08759c7f10e1ab5ba30 Mon Sep 17 00:00:00 2001 From: Misha Bragin Date: Mon, 26 Sep 2022 18:02:20 +0200 Subject: [PATCH] Fix Management and Signal gRPC client stream leak (#482) --- management/client/grpc.go | 13 +++++++++---- signal/client/grpc.go | 16 ++++++++-------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/management/client/grpc.go b/management/client/grpc.go index a2847e8b7..0da261b98 100644 --- a/management/client/grpc.go +++ b/management/client/grpc.go @@ -109,7 +109,9 @@ func (c *GrpcClient) Sync(msgHandler func(msg *proto.SyncResponse) error) error return err } - stream, err := c.connectToStream(*serverPubKey) + ctx, cancelStream := context.WithCancel(c.ctx) + defer cancelStream() + stream, err := c.connectToStream(ctx, *serverPubKey) if err != nil { log.Debugf("failed to open Management Service stream: %s", err) if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied { @@ -145,7 +147,7 @@ func (c *GrpcClient) Sync(msgHandler func(msg *proto.SyncResponse) error) error return nil } -func (c *GrpcClient) connectToStream(serverPubKey wgtypes.Key) (proto.ManagementService_SyncClient, error) { +func (c *GrpcClient) connectToStream(ctx context.Context, serverPubKey wgtypes.Key) (proto.ManagementService_SyncClient, error) { req := &proto.SyncRequest{} myPrivateKey := c.key @@ -156,9 +158,12 @@ func (c *GrpcClient) connectToStream(serverPubKey wgtypes.Key) (proto.Management log.Errorf("failed encrypting message: %s", err) return nil, err } - syncReq := &proto.EncryptedMessage{WgPubKey: myPublicKey.String(), Body: encryptedReq} - return c.realClient.Sync(c.ctx, syncReq) + sync, err := c.realClient.Sync(ctx, syncReq) + if err != nil { + return nil, err + } + return sync, nil } func (c *GrpcClient) receiveEvents(stream proto.ManagementService_SyncClient, serverPubKey wgtypes.Key, msgHandler func(msg *proto.SyncResponse) error) error { diff --git a/signal/client/grpc.go b/signal/client/grpc.go index a9e5567c7..36b4833fb 100644 --- a/signal/client/grpc.go +++ b/signal/client/grpc.go @@ -87,7 +87,7 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo }, nil } -//defaultBackoff is a basic backoff mechanism for general issues +// defaultBackoff is a basic backoff mechanism for general issues func defaultBackoff(ctx context.Context) backoff.BackOff { return backoff.WithContext(&backoff.ExponentialBackOff{ InitialInterval: 800 * time.Millisecond, @@ -121,9 +121,11 @@ func (c *GrpcClient) Receive(msgHandler func(msg *proto.Message) error) error { return fmt.Errorf("connection to signal is not ready and in %s state", connState) } - // connect to Signal stream identifying ourselves with a public Wireguard key + // connect to Signal stream identifying ourselves with a public WireGuard key // todo once the key rotation logic has been implemented, consider changing to some other identifier (received from management) - stream, err := c.connect(c.key.PublicKey().String()) + ctx, cancelStream := context.WithCancel(c.ctx) + defer cancelStream() + stream, err := c.connect(ctx, c.key.PublicKey().String()) if err != nil { log.Warnf("disconnected from the Signal Exchange due to an error: %v", err) return err @@ -180,15 +182,13 @@ func (c *GrpcClient) getStreamStatusChan() <-chan struct{} { return c.connectedCh } -func (c *GrpcClient) connect(key string) (proto.SignalExchange_ConnectStreamClient, error) { +func (c *GrpcClient) connect(ctx context.Context, key string) (proto.SignalExchange_ConnectStreamClient, error) { c.stream = nil // add key fingerprint to the request header to be identified on the server side md := metadata.New(map[string]string{proto.HeaderId: key}) - ctx := metadata.NewOutgoingContext(c.ctx, md) - - stream, err := c.realClient.ConnectStream(ctx, grpc.WaitForReady(true)) - + metaCtx := metadata.NewOutgoingContext(ctx, md) + stream, err := c.realClient.ConnectStream(metaCtx, grpc.WaitForReady(true)) c.stream = stream if err != nil { return nil, err