From d08e5efbce2c5795266f313ac075d4c37ec5ad7f Mon Sep 17 00:00:00 2001 From: Mikhail Bragin Date: Sun, 14 Nov 2021 19:41:17 +0100 Subject: [PATCH] fix: too many open files caused by agent not being closed (#154) * fix: too many open files caused by agent not being closed after unsuccessful attempts to start a peer connection (happens when no network available) * fix: minor refactor to consider signal status --- client/internal/connection.go | 10 ++++++++-- client/internal/engine.go | 7 ++++++- signal/client/client.go | 24 ++++++++++++++---------- signal/client/client_test.go | 2 +- 4 files changed, 29 insertions(+), 14 deletions(-) diff --git a/client/internal/connection.go b/client/internal/connection.go index ed3dfc057..0a2662aed 100644 --- a/client/internal/connection.go +++ b/client/internal/connection.go @@ -138,12 +138,18 @@ func (conn *Connection) Open(timeout time.Duration) error { return !ok }, }) - conn.agent = a - if err != nil { return err } + conn.agent = a + defer func() { + err := conn.agent.Close() + if err != nil { + return + } + }() + err = conn.listenOnLocalCandidates() if err != nil { return err diff --git a/client/internal/engine.go b/client/internal/engine.go index 4145665e9..ce7421adf 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -148,6 +148,11 @@ func (e *Engine) initializePeer(peer Peer) { }, e.ctx) operation := func() error { + + if e.signal.GetStatus() != signal.StreamConnected { + return fmt.Errorf("not opening connection to peer because Signal is unavailable") + } + _, err := e.openPeerConnection(e.wgPort, e.config.WgPrivateKey, peer) e.peerMux.Lock() defer e.peerMux.Unlock() @@ -157,7 +162,7 @@ func (e *Engine) initializePeer(peer Peer) { } if err != nil { - log.Infof("retrying connection because of error: %s", err.Error()) + log.Debugf("retrying connection because of error: %s", err.Error()) return err } return nil diff --git a/signal/client/client.go b/signal/client/client.go index f49375cc4..005c1bac7 100644 --- a/signal/client/client.go +++ b/signal/client/client.go @@ -27,8 +27,8 @@ import ( // Status is the status of the client type Status string -const streamConnected Status = "streamConnected" -const streamDisconnected Status = "streamDisconnected" +const StreamConnected Status = "Connected" +const StreamDisconnected Status = "Disconnected" // Client Wraps the Signal Exchange Service gRpc client type Client struct { @@ -40,10 +40,14 @@ type Client struct { // connectedCh used to notify goroutines waiting for the connection to the Signal stream connectedCh chan struct{} mux sync.Mutex - // streamConnected indicates whether this client is streamConnected to the Signal stream + // StreamConnected indicates whether this client is StreamConnected to the Signal stream status Status } +func (c *Client) GetStatus() Status { + return c.status +} + // Close Closes underlying connections to the Signal Exchange func (c *Client) Close() error { return c.signalConn.Close() @@ -81,7 +85,7 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo signalConn: conn, key: key, mux: sync.Mutex{}, - status: streamDisconnected, + status: StreamDisconnected, }, nil } @@ -120,18 +124,18 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) error { // todo once the key rotation logic has been implemented, consider changing to some other identifier (received from management) stream, err := c.connect(c.key.PublicKey().String()) if err != nil { - log.Warnf("streamDisconnected from the Signal Exchange due to an error: %v", err) + log.Warnf("disconnected from the Signal Exchange due to an error: %v", err) return err } c.notifyStreamConnected() - log.Infof("streamConnected to the Signal Service stream") + log.Infof("connected to the Signal Service stream") // start receiving messages from the Signal stream (from other peers through signal) err = c.receive(stream, msgHandler) if err != nil { - log.Warnf("streamDisconnected from the Signal Exchange due to an error: %v", err) + log.Warnf("disconnected from the Signal Exchange due to an error: %v", err) backOff.Reset() return err } @@ -150,13 +154,13 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) error { func (c *Client) notifyStreamDisconnected() { c.mux.Lock() defer c.mux.Unlock() - c.status = streamDisconnected + c.status = StreamDisconnected } func (c *Client) notifyStreamConnected() { c.mux.Lock() defer c.mux.Unlock() - c.status = streamConnected + c.status = StreamConnected if c.connectedCh != nil { // there are goroutines waiting on this channel -> release them close(c.connectedCh) @@ -208,7 +212,7 @@ func (c *Client) ready() bool { // WaitStreamConnected waits until the client is connected to the Signal stream func (c *Client) WaitStreamConnected() { - if c.status == streamConnected { + if c.status == StreamConnected { return } diff --git a/signal/client/client_test.go b/signal/client/client_test.go index 55aeaf2c6..c2898e363 100644 --- a/signal/client/client_test.go +++ b/signal/client/client_test.go @@ -36,7 +36,7 @@ var _ = Describe("Client", func() { }) Describe("Exchanging messages", func() { - Context("between streamConnected peers", func() { + Context("between connected peers", func() { It("should be successful", func() { var msgReceived sync.WaitGroup