mirror of
https://github.com/netbirdio/netbird.git
synced 2025-08-18 11:00:06 +02:00
[client] Add UI client event notifications (#3207)
This commit is contained in:
@@ -7,21 +7,31 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/exp/maps"
|
||||
"google.golang.org/grpc/codes"
|
||||
gstatus "google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
"github.com/netbirdio/netbird/client/iface/configurer"
|
||||
"github.com/netbirdio/netbird/client/internal/relay"
|
||||
"github.com/netbirdio/netbird/client/proto"
|
||||
"github.com/netbirdio/netbird/management/domain"
|
||||
relayClient "github.com/netbirdio/netbird/relay/client"
|
||||
)
|
||||
|
||||
const eventQueueSize = 10
|
||||
|
||||
type ResolvedDomainInfo struct {
|
||||
Prefixes []netip.Prefix
|
||||
ParentDomain domain.Domain
|
||||
}
|
||||
|
||||
type EventListener interface {
|
||||
OnEvent(event *proto.SystemEvent)
|
||||
}
|
||||
|
||||
// State contains the latest state of a peer
|
||||
type State struct {
|
||||
Mux *sync.RWMutex
|
||||
@@ -157,6 +167,10 @@ type Status struct {
|
||||
peerListChangedForNotification bool
|
||||
|
||||
relayMgr *relayClient.Manager
|
||||
|
||||
eventMux sync.RWMutex
|
||||
eventStreams map[string]chan *proto.SystemEvent
|
||||
eventQueue *EventQueue
|
||||
}
|
||||
|
||||
// NewRecorder returns a new Status instance
|
||||
@@ -164,6 +178,8 @@ func NewRecorder(mgmAddress string) *Status {
|
||||
return &Status{
|
||||
peers: make(map[string]State),
|
||||
changeNotify: make(map[string]chan struct{}),
|
||||
eventStreams: make(map[string]chan *proto.SystemEvent),
|
||||
eventQueue: NewEventQueue(eventQueueSize),
|
||||
offlinePeers: make([]State, 0),
|
||||
notifier: newNotifier(),
|
||||
mgmAddress: mgmAddress,
|
||||
@@ -806,3 +822,112 @@ func (d *Status) notifyAddressChanged() {
|
||||
func (d *Status) numOfPeers() int {
|
||||
return len(d.peers) + len(d.offlinePeers)
|
||||
}
|
||||
|
||||
// PublishEvent adds an event to the queue and distributes it to all subscribers
|
||||
func (d *Status) PublishEvent(
|
||||
severity proto.SystemEvent_Severity,
|
||||
category proto.SystemEvent_Category,
|
||||
msg string,
|
||||
userMsg string,
|
||||
metadata map[string]string,
|
||||
) {
|
||||
event := &proto.SystemEvent{
|
||||
Id: uuid.New().String(),
|
||||
Severity: severity,
|
||||
Category: category,
|
||||
Message: msg,
|
||||
UserMessage: userMsg,
|
||||
Metadata: metadata,
|
||||
Timestamp: timestamppb.Now(),
|
||||
}
|
||||
|
||||
d.eventMux.Lock()
|
||||
defer d.eventMux.Unlock()
|
||||
|
||||
d.eventQueue.Add(event)
|
||||
|
||||
for _, stream := range d.eventStreams {
|
||||
select {
|
||||
case stream <- event:
|
||||
default:
|
||||
log.Debugf("event stream buffer full, skipping event: %v", event)
|
||||
}
|
||||
}
|
||||
|
||||
log.Debugf("event published: %v", event)
|
||||
}
|
||||
|
||||
// SubscribeToEvents returns a new event subscription
|
||||
func (d *Status) SubscribeToEvents() *EventSubscription {
|
||||
d.eventMux.Lock()
|
||||
defer d.eventMux.Unlock()
|
||||
|
||||
id := uuid.New().String()
|
||||
stream := make(chan *proto.SystemEvent, 10)
|
||||
d.eventStreams[id] = stream
|
||||
|
||||
return &EventSubscription{
|
||||
id: id,
|
||||
events: stream,
|
||||
}
|
||||
}
|
||||
|
||||
// UnsubscribeFromEvents removes an event subscription
|
||||
func (d *Status) UnsubscribeFromEvents(sub *EventSubscription) {
|
||||
if sub == nil {
|
||||
return
|
||||
}
|
||||
|
||||
d.eventMux.Lock()
|
||||
defer d.eventMux.Unlock()
|
||||
|
||||
if stream, exists := d.eventStreams[sub.id]; exists {
|
||||
close(stream)
|
||||
delete(d.eventStreams, sub.id)
|
||||
}
|
||||
}
|
||||
|
||||
// GetEventHistory returns all events in the queue
|
||||
func (d *Status) GetEventHistory() []*proto.SystemEvent {
|
||||
return d.eventQueue.GetAll()
|
||||
}
|
||||
|
||||
type EventQueue struct {
|
||||
maxSize int
|
||||
events []*proto.SystemEvent
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
func NewEventQueue(size int) *EventQueue {
|
||||
return &EventQueue{
|
||||
maxSize: size,
|
||||
events: make([]*proto.SystemEvent, 0, size),
|
||||
}
|
||||
}
|
||||
|
||||
func (q *EventQueue) Add(event *proto.SystemEvent) {
|
||||
q.mutex.Lock()
|
||||
defer q.mutex.Unlock()
|
||||
|
||||
q.events = append(q.events, event)
|
||||
|
||||
if len(q.events) > q.maxSize {
|
||||
q.events = q.events[len(q.events)-q.maxSize:]
|
||||
}
|
||||
}
|
||||
|
||||
func (q *EventQueue) GetAll() []*proto.SystemEvent {
|
||||
q.mutex.RLock()
|
||||
defer q.mutex.RUnlock()
|
||||
|
||||
return slices.Clone(q.events)
|
||||
}
|
||||
|
||||
type EventSubscription struct {
|
||||
id string
|
||||
events chan *proto.SystemEvent
|
||||
}
|
||||
|
||||
func (s *EventSubscription) Events() <-chan *proto.SystemEvent {
|
||||
return s.events
|
||||
}
|
||||
|
Reference in New Issue
Block a user