[client] Refactor peer state change subscription mechanism (#3910)

* Refactor peer state change subscription mechanism

Because the code generated new channel for every single event, was easy to miss notification.
Use single channel.

* Fix lint

* Avoid potential deadlock

* Fix test

* Add context

* Fix test
This commit is contained in:
Zoltan Papp 2025-06-03 09:20:33 +02:00 committed by GitHub
parent 35287f8241
commit af27aaf9af
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 78 additions and 27 deletions

View File

@ -1,6 +1,7 @@
package peer package peer
import ( import (
"context"
"errors" "errors"
"net/netip" "net/netip"
"slices" "slices"
@ -146,11 +147,31 @@ type FullStatus struct {
LazyConnectionEnabled bool LazyConnectionEnabled bool
} }
type StatusChangeSubscription struct {
peerID string
id string
eventsChan chan struct{}
ctx context.Context
}
func newStatusChangeSubscription(ctx context.Context, peerID string) *StatusChangeSubscription {
return &StatusChangeSubscription{
ctx: ctx,
peerID: peerID,
id: uuid.New().String(),
eventsChan: make(chan struct{}, 1),
}
}
func (s *StatusChangeSubscription) Events() chan struct{} {
return s.eventsChan
}
// Status holds a state of peers, signal, management connections and relays // Status holds a state of peers, signal, management connections and relays
type Status struct { type Status struct {
mux sync.Mutex mux sync.Mutex
peers map[string]State peers map[string]State
changeNotify map[string]chan struct{} changeNotify map[string]map[string]*StatusChangeSubscription // map[peerID]map[subscriptionID]*StatusChangeSubscription
signalState bool signalState bool
signalError error signalError error
managementState bool managementState bool
@ -187,7 +208,7 @@ type Status struct {
func NewRecorder(mgmAddress string) *Status { func NewRecorder(mgmAddress string) *Status {
return &Status{ return &Status{
peers: make(map[string]State), peers: make(map[string]State),
changeNotify: make(map[string]chan struct{}), changeNotify: make(map[string]map[string]*StatusChangeSubscription),
eventStreams: make(map[string]chan *proto.SystemEvent), eventStreams: make(map[string]chan *proto.SystemEvent),
eventQueue: NewEventQueue(eventQueueSize), eventQueue: NewEventQueue(eventQueueSize),
offlinePeers: make([]State, 0), offlinePeers: make([]State, 0),
@ -312,7 +333,6 @@ func (d *Status) UpdatePeerState(receivedState State) error {
// when we close the connection we will not notify the router manager // when we close the connection we will not notify the router manager
if receivedState.ConnStatus == StatusIdle { if receivedState.ConnStatus == StatusIdle {
d.notifyPeerStateChangeListeners(receivedState.PubKey) d.notifyPeerStateChangeListeners(receivedState.PubKey)
} }
return nil return nil
} }
@ -552,19 +572,41 @@ func (d *Status) FinishPeerListModifications() {
d.notifyPeerListChanged() d.notifyPeerListChanged()
} }
// GetPeerStateChangeNotifier returns a change notifier channel for a peer func (d *Status) SubscribeToPeerStateChanges(ctx context.Context, peerID string) *StatusChangeSubscription {
func (d *Status) GetPeerStateChangeNotifier(peer string) <-chan struct{} {
d.mux.Lock() d.mux.Lock()
defer d.mux.Unlock() defer d.mux.Unlock()
ch, found := d.changeNotify[peer] sub := newStatusChangeSubscription(ctx, peerID)
if found { if _, ok := d.changeNotify[peerID]; !ok {
return ch d.changeNotify[peerID] = make(map[string]*StatusChangeSubscription)
}
d.changeNotify[peerID][sub.id] = sub
return sub
}
func (d *Status) UnsubscribePeerStateChanges(subscription *StatusChangeSubscription) {
d.mux.Lock()
defer d.mux.Unlock()
if subscription == nil {
return
} }
ch = make(chan struct{}) channels, ok := d.changeNotify[subscription.peerID]
d.changeNotify[peer] = ch if !ok {
return ch return
}
sub, exists := channels[subscription.id]
if !exists {
return
}
delete(channels, subscription.id)
if len(channels) == 0 {
delete(d.changeNotify, sub.peerID)
}
} }
// GetLocalPeerState returns the local peer state // GetLocalPeerState returns the local peer state
@ -939,13 +981,20 @@ func (d *Status) onConnectionChanged() {
// notifyPeerStateChangeListeners notifies route manager about the change in peer state // notifyPeerStateChangeListeners notifies route manager about the change in peer state
func (d *Status) notifyPeerStateChangeListeners(peerID string) { func (d *Status) notifyPeerStateChangeListeners(peerID string) {
ch, found := d.changeNotify[peerID] subs, ok := d.changeNotify[peerID]
if !found { if !ok {
return return
} }
for _, sub := range subs {
close(ch) // block the write because we do not want to miss notification
delete(d.changeNotify, peerID) // must have to be sure we will run the GetPeerState() on separated thread
go func() {
select {
case sub.eventsChan <- struct{}{}:
case <-sub.ctx.Done():
}
}()
}
} }
func (d *Status) notifyPeerListChanged() { func (d *Status) notifyPeerListChanged() {

View File

@ -1,6 +1,7 @@
package peer package peer
import ( import (
"context"
"errors" "errors"
"sync" "sync"
"testing" "testing"
@ -86,8 +87,8 @@ func TestGetPeerStateChangeNotifierLogic(t *testing.T) {
status := NewRecorder("https://mgm") status := NewRecorder("https://mgm")
_ = status.AddPeer(key, "abc.netbird", ip) _ = status.AddPeer(key, "abc.netbird", ip)
ch := status.GetPeerStateChangeNotifier(key) sub := status.SubscribeToPeerStateChanges(context.Background(), key)
assert.NotNil(t, ch, "channel shouldn't be nil") assert.NotNil(t, sub, "channel shouldn't be nil")
peerState := State{ peerState := State{
PubKey: key, PubKey: key,
@ -99,10 +100,12 @@ func TestGetPeerStateChangeNotifierLogic(t *testing.T) {
err := status.UpdatePeerRelayedStateToDisconnected(peerState) err := status.UpdatePeerRelayedStateToDisconnected(peerState)
assert.NoError(t, err, "shouldn't return error") assert.NoError(t, err, "shouldn't return error")
timeoutCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
select { select {
case <-ch: case <-sub.eventsChan:
default: case <-timeoutCtx.Done():
t.Errorf("channel wasn't closed after update") t.Errorf("timed out waiting for event")
} }
} }

View File

@ -224,19 +224,18 @@ func (c *clientNetwork) getBestRouteFromStatuses(routePeerStatuses map[route.ID]
} }
func (c *clientNetwork) watchPeerStatusChanges(ctx context.Context, peerKey string, peerStateUpdate chan struct{}, closer chan struct{}) { func (c *clientNetwork) watchPeerStatusChanges(ctx context.Context, peerKey string, peerStateUpdate chan struct{}, closer chan struct{}) {
subscription := c.statusRecorder.SubscribeToPeerStateChanges(ctx, peerKey)
defer c.statusRecorder.UnsubscribePeerStateChanges(subscription)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-closer: case <-closer:
return return
case <-c.statusRecorder.GetPeerStateChangeNotifier(peerKey): case <-subscription.Events():
state, err := c.statusRecorder.GetPeer(peerKey)
if err != nil {
continue
}
peerStateUpdate <- struct{}{} peerStateUpdate <- struct{}{}
log.Debugf("triggered route state update for Peer %s, state: %s", peerKey, state.ConnStatus) log.Debugf("triggered route state update for Peer: %s", peerKey)
} }
} }
} }