diff --git a/management/server/grpcserver.go b/management/server/grpcserver.go index 2b27f9e0f..5e9c9ea02 100644 --- a/management/server/grpcserver.go +++ b/management/server/grpcserver.go @@ -184,7 +184,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi return err } - updates := s.peersUpdateManager.CreateChannel(ctx, peer.ID) + updateBuffer := s.peersUpdateManager.CreateChannel(ctx, peer.ID) s.ephemeralManager.OnPeerConnected(ctx, peer) @@ -199,37 +199,24 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart)) - return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv) + return s.handleUpdates(ctx, accountID, peerKey, peer, updateBuffer, srv) } // handleUpdates sends updates to the connected peer until the updates channel is closed. -func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *UpdateMessage, srv proto.ManagementService_SyncServer) error { +func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates *UpdateBuffer, srv proto.ManagementService_SyncServer) error { log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String()) + for { - select { - // condition when there are some updates - case update, open := <-updates: - if s.appMetrics != nil { - s.appMetrics.GRPCMetrics().UpdateChannelQueueLength(len(updates) + 1) - } - - if !open { - log.WithContext(ctx).Debugf("updates channel for peer %s was closed", peerKey.String()) - s.cancelPeerRoutines(ctx, accountID, peer) - return nil - } - log.WithContext(ctx).Debugf("received an update for peer %s", peerKey.String()) - - if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil { - return err - } - - // condition when client <-> server connection has been terminated - case <-srv.Context().Done(): - // happens when connection drops, e.g. client disconnects - log.WithContext(ctx).Debugf("stream of peer %s has been closed", peerKey.String()) + update, ok := updates.Pop(ctx) + if !ok { + log.WithContext(ctx).Debugf("update buffer for peer %s closed", peerKey.String()) s.cancelPeerRoutines(ctx, accountID, peer) - return srv.Context().Err() + return nil + } + + log.WithContext(ctx).Debugf("sending latest update to peer %s", peerKey.String()) + if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil { + return err } } } diff --git a/management/server/update_buffer.go b/management/server/update_buffer.go new file mode 100644 index 000000000..c6ce37152 --- /dev/null +++ b/management/server/update_buffer.go @@ -0,0 +1,67 @@ +package server + +import ( + "context" + "sync" +) + +type UpdateBuffer struct { + mu sync.Mutex + cond *sync.Cond + update *UpdateMessage + closed bool +} + +func NewUpdateBuffer() *UpdateBuffer { + ub := &UpdateBuffer{} + ub.cond = sync.NewCond(&ub.mu) + return ub +} + +func (b *UpdateBuffer) Push(update *UpdateMessage) { + b.mu.Lock() + defer b.mu.Unlock() + + // the equal case we need because we don't always increment the serial number + if b.update == nil || update.NetworkMap.Network.Serial >= b.update.NetworkMap.Network.Serial { + b.update = update + b.cond.Signal() + } +} + +func (b *UpdateBuffer) Pop(ctx context.Context) (*UpdateMessage, bool) { + b.mu.Lock() + defer b.mu.Unlock() + + for b.update == nil && !b.closed { + waitCh := make(chan struct{}) + go func() { + b.cond.Wait() + close(waitCh) + }() + + b.mu.Unlock() + select { + case <-ctx.Done(): + b.mu.Lock() + return nil, false + case <-waitCh: + // Wakeup due to Push() or Close() + } + b.mu.Lock() + } + + if b.closed { + return nil, false + } + msg := b.update + b.update = nil + return msg, true +} + +func (b *UpdateBuffer) Close() { + b.mu.Lock() + b.closed = true + b.cond.Broadcast() + b.mu.Unlock() +} diff --git a/management/server/updatechannel.go b/management/server/updatechannel.go index be4d1fd3a..8938d52ce 100644 --- a/management/server/updatechannel.go +++ b/management/server/updatechannel.go @@ -21,7 +21,7 @@ type UpdateMessage struct { type PeersUpdateManager struct { // peerChannels is an update channel indexed by Peer.ID - peerChannels map[string]chan *UpdateMessage + peerChannels map[string]*UpdateBuffer // channelsMux keeps the mutex to access peerChannels channelsMux *sync.RWMutex // metrics provides method to collect application metrics @@ -31,7 +31,7 @@ type PeersUpdateManager struct { // NewPeersUpdateManager returns a new instance of PeersUpdateManager func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager { return &PeersUpdateManager{ - peerChannels: make(map[string]chan *UpdateMessage), + peerChannels: make(map[string]*UpdateBuffer), channelsMux: &sync.RWMutex{}, metrics: metrics, } @@ -53,27 +53,14 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda if channel, ok := p.peerChannels[peerID]; ok { found = true - for { - select { - case channel <- update: - log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID) - return - default: - select { - case <-channel: - log.WithContext(ctx).Tracef("dropped oldest message from channel for peer %s", peerID) - default: - log.WithContext(ctx).Tracef("channel unexpectedly empty while trying to drop for peer %s", peerID) - } - } - } + channel.Push(update) } else { log.WithContext(ctx).Debugf("peer %s has no channel", peerID) } } // CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer. -func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) chan *UpdateMessage { +func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) *UpdateBuffer { start := time.Now() closed := false @@ -88,22 +75,21 @@ func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) c if channel, ok := p.peerChannels[peerID]; ok { closed = true + channel.Close() delete(p.peerChannels, peerID) - close(channel) } // mbragin: todo shouldn't it be more? or configurable? - channel := make(chan *UpdateMessage, channelBufferSize) - p.peerChannels[peerID] = channel + buffer := NewUpdateBuffer() log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID) - return channel + return buffer } func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) { if channel, ok := p.peerChannels[peerID]; ok { delete(p.peerChannels, peerID) - close(channel) + channel.Close() log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID) return