Fix Register/Deregister race on Signal (#431)

This PR fixes a race condition that happens
when agents connect to a Signal stream, multiple
times within a short amount of time. Common on
slow and unstable internet connections.
Every time an agent establishes a new connection
to Signal, Signal creates a Stream and writes an entry
to the registry of connected peers storing the stream.
Every time an agent disconnects, Signal removes the
stream from the registry.
Due to unstable connections, the agent could detect
a broken connection, and attempt to reconnect to Signal.
Signal will override the stream, but it might detect
the old broken connection later, causing peer deregistration.
It will deregister the peer leaving the client thinking
it is still connected, rejecting any messages.
This commit is contained in:
Misha Bragin 2022-08-22 12:21:19 +02:00 committed by GitHub
parent 000ea72aec
commit 762a26dcea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 68 additions and 21 deletions

View File

@ -382,8 +382,6 @@ func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtyp
},
})
if err != nil {
log.Errorf("failed signaling candidate to the remote peer %s %s", remoteKey.String(), err)
// todo ??
return err
}
@ -704,6 +702,7 @@ func (e Engine) peerExists(peerKey string) bool {
}
func (e Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, error) {
log.Debugf("creating peer connection %s", pubKey)
var stunTurn []*ice.URL
stunTurn = append(stunTurn, e.STUNs...)
stunTurn = append(stunTurn, e.TURNs...)

View File

@ -139,6 +139,7 @@ func (c *GrpcClient) Receive(msgHandler func(msg *proto.Message) error) error {
// we need this reset because after a successful connection and a consequent error, backoff lib doesn't
// reset times and next try will start with a long delay
backOff.Reset()
log.Warnf("disconnected from the Signal service but will retry silently. Reason: %v", err)
return err
}

View File

@ -4,6 +4,7 @@ import (
"github.com/netbirdio/netbird/signal/proto"
log "github.com/sirupsen/logrus"
"sync"
"time"
)
// Peer representation of a connected Peer
@ -11,6 +12,8 @@ type Peer struct {
// a unique id of the Peer (e.g. sha256 fingerprint of the Wireguard public key)
Id string
StreamID int64
//a gRpc connection stream to the Peer
Stream proto.SignalExchange_ConnectStreamServer
}
@ -18,20 +21,25 @@ type Peer struct {
// NewPeer creates a new instance of a connected Peer
func NewPeer(id string, stream proto.SignalExchange_ConnectStreamServer) *Peer {
return &Peer{
Id: id,
Stream: stream,
Id: id,
Stream: stream,
StreamID: time.Now().UnixNano(),
}
}
// Registry registry that holds all currently connected Peers
// Registry that holds all currently connected Peers
type Registry struct {
// Peer.key -> Peer
Peers sync.Map
// regMutex ensures that registration and de-registrations are safe
regMutex sync.Mutex
}
// NewRegistry creates a new connected Peer registry
func NewRegistry() *Registry {
return &Registry{}
return &Registry{
regMutex: sync.Mutex{},
}
}
// Get gets a peer from the registry
@ -52,20 +60,34 @@ func (registry *Registry) IsPeerRegistered(peerId string) bool {
// Register registers peer in the registry
func (registry *Registry) Register(peer *Peer) {
// can be that peer already exists but it is fine (e.g. reconnect)
// todo investigate what happens to the old peer (especially Peer.Stream) when we override it
registry.Peers.Store(peer.Id, peer)
log.Debugf("peer registered [%s]", peer.Id)
registry.regMutex.Lock()
defer registry.regMutex.Unlock()
}
// Deregister deregister Peer from the Registry (usually once it disconnects)
func (registry *Registry) Deregister(peer *Peer) {
_, loaded := registry.Peers.LoadAndDelete(peer.Id)
// can be that peer already exists, but it is fine (e.g. reconnect)
p, loaded := registry.Peers.LoadOrStore(peer.Id, peer)
if loaded {
log.Debugf("peer deregistered [%s]", peer.Id)
} else {
log.Warnf("attempted to remove non-existent peer [%s]", peer.Id)
pp := p.(*Peer)
log.Warnf("peer [%s] is already registered [new streamID %d, previous StreamID %d]. Will override stream.",
peer.Id, peer.StreamID, pp.StreamID)
registry.Peers.Store(peer.Id, peer)
}
log.Debugf("peer registered [%s]", peer.Id)
}
// Deregister Peer from the Registry (usually once it disconnects)
func (registry *Registry) Deregister(peer *Peer) {
registry.regMutex.Lock()
defer registry.regMutex.Unlock()
p, loaded := registry.Peers.LoadAndDelete(peer.Id)
if loaded {
pp := p.(*Peer)
if peer.StreamID < pp.StreamID {
registry.Peers.Store(peer.Id, p)
log.Warnf("attempted to remove newer registered stream of a peer [%s] [newer streamID %d, previous StreamID %d]. Ignoring.",
peer.Id, pp.StreamID, peer.StreamID)
return
}
}
log.Debugf("peer deregistered [%s]", peer.Id)
}

View File

@ -1,9 +1,34 @@
package peer
import (
"github.com/stretchr/testify/assert"
"testing"
"time"
)
func TestRegistry_ShouldNotDeregisterWhenHasNewerStreamRegistered(t *testing.T) {
r := NewRegistry()
peerID := "peer"
olderPeer := NewPeer(peerID, nil)
r.Register(olderPeer)
time.Sleep(time.Nanosecond)
newerPeer := NewPeer(peerID, nil)
r.Register(newerPeer)
registered, _ := r.Get(olderPeer.Id)
assert.NotNil(t, registered, "peer can't be nil")
assert.Equal(t, newerPeer, registered)
r.Deregister(olderPeer)
registered, _ = r.Get(olderPeer.Id)
assert.NotNil(t, registered, "peer can't be nil")
assert.Equal(t, newerPeer, registered)
}
func TestRegistry_GetNonExistentPeer(t *testing.T) {
r := NewRegistry()

View File

@ -55,7 +55,7 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
}
defer func() {
log.Infof("peer disconnected [%s] ", p.Id)
log.Infof("peer disconnected [%s] [streamID %d] ", p.Id, p.StreamID)
s.registry.Deregister(p)
}()
@ -66,7 +66,7 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
return err
}
log.Infof("peer connected [%s]", p.Id)
log.Infof("peer connected [%s] [streamID %d] ", p.Id, p.StreamID)
for {
//read incoming messages