mirror of
https://github.com/netbirdio/netbird.git
synced 2024-11-07 08:44:07 +01:00
765aba2c1c
propagate context from all the API calls and log request ID, account ID and peer ID --------- Co-authored-by: Zoltan Papp <zoltan.pmail@gmail.com>
173 lines
4.3 KiB
Go
173 lines
4.3 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"github.com/netbirdio/netbird/management/proto"
|
|
"github.com/netbirdio/netbird/management/server/telemetry"
|
|
)
|
|
|
|
const channelBufferSize = 100
|
|
|
|
type UpdateMessage struct {
|
|
Update *proto.SyncResponse
|
|
}
|
|
|
|
type PeersUpdateManager struct {
|
|
// peerChannels is an update channel indexed by Peer.ID
|
|
peerChannels map[string]chan *UpdateMessage
|
|
// channelsMux keeps the mutex to access peerChannels
|
|
channelsMux *sync.Mutex
|
|
// metrics provides method to collect application metrics
|
|
metrics telemetry.AppMetrics
|
|
}
|
|
|
|
// NewPeersUpdateManager returns a new instance of PeersUpdateManager
|
|
func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager {
|
|
return &PeersUpdateManager{
|
|
peerChannels: make(map[string]chan *UpdateMessage),
|
|
channelsMux: &sync.Mutex{},
|
|
metrics: metrics,
|
|
}
|
|
}
|
|
|
|
// SendUpdate sends update message to the peer's channel
|
|
func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, update *UpdateMessage) {
|
|
start := time.Now()
|
|
var found, dropped bool
|
|
|
|
p.channelsMux.Lock()
|
|
defer func() {
|
|
p.channelsMux.Unlock()
|
|
if p.metrics != nil {
|
|
p.metrics.UpdateChannelMetrics().CountSendUpdateDuration(time.Since(start), found, dropped)
|
|
}
|
|
}()
|
|
|
|
if channel, ok := p.peerChannels[peerID]; ok {
|
|
found = true
|
|
select {
|
|
case channel <- update:
|
|
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
|
|
default:
|
|
dropped = true
|
|
log.WithContext(ctx).Warnf("channel for peer %s is %d full", peerID, len(channel))
|
|
}
|
|
} else {
|
|
log.WithContext(ctx).Debugf("peer %s has no channel", peerID)
|
|
}
|
|
}
|
|
|
|
// CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer.
|
|
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) chan *UpdateMessage {
|
|
start := time.Now()
|
|
|
|
closed := false
|
|
|
|
p.channelsMux.Lock()
|
|
defer func() {
|
|
p.channelsMux.Unlock()
|
|
if p.metrics != nil {
|
|
p.metrics.UpdateChannelMetrics().CountCreateChannelDuration(time.Since(start), closed)
|
|
}
|
|
}()
|
|
|
|
if channel, ok := p.peerChannels[peerID]; ok {
|
|
closed = true
|
|
delete(p.peerChannels, peerID)
|
|
close(channel)
|
|
}
|
|
// mbragin: todo shouldn't it be more? or configurable?
|
|
channel := make(chan *UpdateMessage, channelBufferSize)
|
|
p.peerChannels[peerID] = channel
|
|
|
|
log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID)
|
|
|
|
return channel
|
|
}
|
|
|
|
func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) {
|
|
if channel, ok := p.peerChannels[peerID]; ok {
|
|
delete(p.peerChannels, peerID)
|
|
close(channel)
|
|
}
|
|
|
|
log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID)
|
|
}
|
|
|
|
// CloseChannels closes updates channel for each given peer
|
|
func (p *PeersUpdateManager) CloseChannels(ctx context.Context, peerIDs []string) {
|
|
start := time.Now()
|
|
|
|
p.channelsMux.Lock()
|
|
defer func() {
|
|
p.channelsMux.Unlock()
|
|
if p.metrics != nil {
|
|
p.metrics.UpdateChannelMetrics().CountCloseChannelsDuration(time.Since(start), len(peerIDs))
|
|
}
|
|
}()
|
|
|
|
for _, id := range peerIDs {
|
|
p.closeChannel(ctx, id)
|
|
}
|
|
}
|
|
|
|
// CloseChannel closes updates channel of a given peer
|
|
func (p *PeersUpdateManager) CloseChannel(ctx context.Context, peerID string) {
|
|
start := time.Now()
|
|
|
|
p.channelsMux.Lock()
|
|
defer func() {
|
|
p.channelsMux.Unlock()
|
|
if p.metrics != nil {
|
|
p.metrics.UpdateChannelMetrics().CountCloseChannelDuration(time.Since(start))
|
|
}
|
|
}()
|
|
|
|
p.closeChannel(ctx, peerID)
|
|
}
|
|
|
|
// GetAllConnectedPeers returns a copy of the connected peers map
|
|
func (p *PeersUpdateManager) GetAllConnectedPeers() map[string]struct{} {
|
|
start := time.Now()
|
|
|
|
p.channelsMux.Lock()
|
|
|
|
m := make(map[string]struct{})
|
|
|
|
defer func() {
|
|
p.channelsMux.Unlock()
|
|
if p.metrics != nil {
|
|
p.metrics.UpdateChannelMetrics().CountGetAllConnectedPeersDuration(time.Since(start), len(m))
|
|
}
|
|
}()
|
|
|
|
for ID := range p.peerChannels {
|
|
m[ID] = struct{}{}
|
|
}
|
|
|
|
return m
|
|
}
|
|
|
|
// HasChannel returns true if peers has channel in update manager, otherwise false
|
|
func (p *PeersUpdateManager) HasChannel(peerID string) bool {
|
|
start := time.Now()
|
|
|
|
p.channelsMux.Lock()
|
|
|
|
defer func() {
|
|
p.channelsMux.Unlock()
|
|
if p.metrics != nil {
|
|
p.metrics.UpdateChannelMetrics().CountHasChannelDuration(time.Since(start))
|
|
}
|
|
}()
|
|
|
|
_, ok := p.peerChannels[peerID]
|
|
|
|
return ok
|
|
}
|