mirror of
https://github.com/netbirdio/netbird.git
synced 2025-04-10 10:48:55 +02:00
fix: too many open files caused by agent not being closed (#154)
* fix: too many open files caused by agent not being closed after unsuccessful attempts to start a peer connection (happens when no network available) * fix: minor refactor to consider signal status
This commit is contained in:
parent
95ef8547f3
commit
d08e5efbce
@ -138,12 +138,18 @@ func (conn *Connection) Open(timeout time.Duration) error {
|
|||||||
return !ok
|
return !ok
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
conn.agent = a
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
conn.agent = a
|
||||||
|
defer func() {
|
||||||
|
err := conn.agent.Close()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
err = conn.listenOnLocalCandidates()
|
err = conn.listenOnLocalCandidates()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -148,6 +148,11 @@ func (e *Engine) initializePeer(peer Peer) {
|
|||||||
}, e.ctx)
|
}, e.ctx)
|
||||||
|
|
||||||
operation := func() error {
|
operation := func() error {
|
||||||
|
|
||||||
|
if e.signal.GetStatus() != signal.StreamConnected {
|
||||||
|
return fmt.Errorf("not opening connection to peer because Signal is unavailable")
|
||||||
|
}
|
||||||
|
|
||||||
_, err := e.openPeerConnection(e.wgPort, e.config.WgPrivateKey, peer)
|
_, err := e.openPeerConnection(e.wgPort, e.config.WgPrivateKey, peer)
|
||||||
e.peerMux.Lock()
|
e.peerMux.Lock()
|
||||||
defer e.peerMux.Unlock()
|
defer e.peerMux.Unlock()
|
||||||
@ -157,7 +162,7 @@ func (e *Engine) initializePeer(peer Peer) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Infof("retrying connection because of error: %s", err.Error())
|
log.Debugf("retrying connection because of error: %s", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -27,8 +27,8 @@ import (
|
|||||||
// Status is the status of the client
|
// Status is the status of the client
|
||||||
type Status string
|
type Status string
|
||||||
|
|
||||||
const streamConnected Status = "streamConnected"
|
const StreamConnected Status = "Connected"
|
||||||
const streamDisconnected Status = "streamDisconnected"
|
const StreamDisconnected Status = "Disconnected"
|
||||||
|
|
||||||
// Client Wraps the Signal Exchange Service gRpc client
|
// Client Wraps the Signal Exchange Service gRpc client
|
||||||
type Client struct {
|
type Client struct {
|
||||||
@ -40,10 +40,14 @@ type Client struct {
|
|||||||
// connectedCh used to notify goroutines waiting for the connection to the Signal stream
|
// connectedCh used to notify goroutines waiting for the connection to the Signal stream
|
||||||
connectedCh chan struct{}
|
connectedCh chan struct{}
|
||||||
mux sync.Mutex
|
mux sync.Mutex
|
||||||
// streamConnected indicates whether this client is streamConnected to the Signal stream
|
// StreamConnected indicates whether this client is StreamConnected to the Signal stream
|
||||||
status Status
|
status Status
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) GetStatus() Status {
|
||||||
|
return c.status
|
||||||
|
}
|
||||||
|
|
||||||
// Close Closes underlying connections to the Signal Exchange
|
// Close Closes underlying connections to the Signal Exchange
|
||||||
func (c *Client) Close() error {
|
func (c *Client) Close() error {
|
||||||
return c.signalConn.Close()
|
return c.signalConn.Close()
|
||||||
@ -81,7 +85,7 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo
|
|||||||
signalConn: conn,
|
signalConn: conn,
|
||||||
key: key,
|
key: key,
|
||||||
mux: sync.Mutex{},
|
mux: sync.Mutex{},
|
||||||
status: streamDisconnected,
|
status: StreamDisconnected,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -120,18 +124,18 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) error {
|
|||||||
// todo once the key rotation logic has been implemented, consider changing to some other identifier (received from management)
|
// todo once the key rotation logic has been implemented, consider changing to some other identifier (received from management)
|
||||||
stream, err := c.connect(c.key.PublicKey().String())
|
stream, err := c.connect(c.key.PublicKey().String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("streamDisconnected from the Signal Exchange due to an error: %v", err)
|
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
c.notifyStreamConnected()
|
c.notifyStreamConnected()
|
||||||
|
|
||||||
log.Infof("streamConnected to the Signal Service stream")
|
log.Infof("connected to the Signal Service stream")
|
||||||
|
|
||||||
// start receiving messages from the Signal stream (from other peers through signal)
|
// start receiving messages from the Signal stream (from other peers through signal)
|
||||||
err = c.receive(stream, msgHandler)
|
err = c.receive(stream, msgHandler)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("streamDisconnected from the Signal Exchange due to an error: %v", err)
|
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
|
||||||
backOff.Reset()
|
backOff.Reset()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -150,13 +154,13 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) error {
|
|||||||
func (c *Client) notifyStreamDisconnected() {
|
func (c *Client) notifyStreamDisconnected() {
|
||||||
c.mux.Lock()
|
c.mux.Lock()
|
||||||
defer c.mux.Unlock()
|
defer c.mux.Unlock()
|
||||||
c.status = streamDisconnected
|
c.status = StreamDisconnected
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) notifyStreamConnected() {
|
func (c *Client) notifyStreamConnected() {
|
||||||
c.mux.Lock()
|
c.mux.Lock()
|
||||||
defer c.mux.Unlock()
|
defer c.mux.Unlock()
|
||||||
c.status = streamConnected
|
c.status = StreamConnected
|
||||||
if c.connectedCh != nil {
|
if c.connectedCh != nil {
|
||||||
// there are goroutines waiting on this channel -> release them
|
// there are goroutines waiting on this channel -> release them
|
||||||
close(c.connectedCh)
|
close(c.connectedCh)
|
||||||
@ -208,7 +212,7 @@ func (c *Client) ready() bool {
|
|||||||
// WaitStreamConnected waits until the client is connected to the Signal stream
|
// WaitStreamConnected waits until the client is connected to the Signal stream
|
||||||
func (c *Client) WaitStreamConnected() {
|
func (c *Client) WaitStreamConnected() {
|
||||||
|
|
||||||
if c.status == streamConnected {
|
if c.status == StreamConnected {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,7 +36,7 @@ var _ = Describe("Client", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
Describe("Exchanging messages", func() {
|
Describe("Exchanging messages", func() {
|
||||||
Context("between streamConnected peers", func() {
|
Context("between connected peers", func() {
|
||||||
It("should be successful", func() {
|
It("should be successful", func() {
|
||||||
|
|
||||||
var msgReceived sync.WaitGroup
|
var msgReceived sync.WaitGroup
|
||||||
|
Loading…
Reference in New Issue
Block a user