use update buffer instead of channel

This commit is contained in:
Pascal Fischer 2025-06-17 15:44:14 +02:00
parent 5e9ea122f7
commit 0c7cac81f0
3 changed files with 88 additions and 48 deletions

View File

@ -184,7 +184,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
return err
}
updates := s.peersUpdateManager.CreateChannel(ctx, peer.ID)
updateBuffer := s.peersUpdateManager.CreateChannel(ctx, peer.ID)
s.ephemeralManager.OnPeerConnected(ctx, peer)
@ -199,37 +199,24 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart))
return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv)
return s.handleUpdates(ctx, accountID, peerKey, peer, updateBuffer, srv)
}
// handleUpdates sends updates to the connected peer until the updates channel is closed.
func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *UpdateMessage, srv proto.ManagementService_SyncServer) error {
func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates *UpdateBuffer, srv proto.ManagementService_SyncServer) error {
log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String())
for {
select {
// condition when there are some updates
case update, open := <-updates:
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().UpdateChannelQueueLength(len(updates) + 1)
}
if !open {
log.WithContext(ctx).Debugf("updates channel for peer %s was closed", peerKey.String())
s.cancelPeerRoutines(ctx, accountID, peer)
return nil
}
log.WithContext(ctx).Debugf("received an update for peer %s", peerKey.String())
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {
return err
}
// condition when client <-> server connection has been terminated
case <-srv.Context().Done():
// happens when connection drops, e.g. client disconnects
log.WithContext(ctx).Debugf("stream of peer %s has been closed", peerKey.String())
update, ok := updates.Pop(ctx)
if !ok {
log.WithContext(ctx).Debugf("update buffer for peer %s closed", peerKey.String())
s.cancelPeerRoutines(ctx, accountID, peer)
return srv.Context().Err()
return nil
}
log.WithContext(ctx).Debugf("sending latest update to peer %s", peerKey.String())
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {
return err
}
}
}

View File

@ -0,0 +1,67 @@
package server
import (
"context"
"sync"
)
type UpdateBuffer struct {
mu sync.Mutex
cond *sync.Cond
update *UpdateMessage
closed bool
}
func NewUpdateBuffer() *UpdateBuffer {
ub := &UpdateBuffer{}
ub.cond = sync.NewCond(&ub.mu)
return ub
}
func (b *UpdateBuffer) Push(update *UpdateMessage) {
b.mu.Lock()
defer b.mu.Unlock()
// the equal case we need because we don't always increment the serial number
if b.update == nil || update.NetworkMap.Network.Serial >= b.update.NetworkMap.Network.Serial {
b.update = update
b.cond.Signal()
}
}
func (b *UpdateBuffer) Pop(ctx context.Context) (*UpdateMessage, bool) {
b.mu.Lock()
defer b.mu.Unlock()
for b.update == nil && !b.closed {
waitCh := make(chan struct{})
go func() {
b.cond.Wait()
close(waitCh)
}()
b.mu.Unlock()
select {
case <-ctx.Done():
b.mu.Lock()
return nil, false
case <-waitCh:
// Wakeup due to Push() or Close()
}
b.mu.Lock()
}
if b.closed {
return nil, false
}
msg := b.update
b.update = nil
return msg, true
}
func (b *UpdateBuffer) Close() {
b.mu.Lock()
b.closed = true
b.cond.Broadcast()
b.mu.Unlock()
}

View File

@ -21,7 +21,7 @@ type UpdateMessage struct {
type PeersUpdateManager struct {
// peerChannels is an update channel indexed by Peer.ID
peerChannels map[string]chan *UpdateMessage
peerChannels map[string]*UpdateBuffer
// channelsMux keeps the mutex to access peerChannels
channelsMux *sync.RWMutex
// metrics provides method to collect application metrics
@ -31,7 +31,7 @@ type PeersUpdateManager struct {
// NewPeersUpdateManager returns a new instance of PeersUpdateManager
func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager {
return &PeersUpdateManager{
peerChannels: make(map[string]chan *UpdateMessage),
peerChannels: make(map[string]*UpdateBuffer),
channelsMux: &sync.RWMutex{},
metrics: metrics,
}
@ -53,27 +53,14 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda
if channel, ok := p.peerChannels[peerID]; ok {
found = true
for {
select {
case channel <- update:
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
return
default:
select {
case <-channel:
log.WithContext(ctx).Tracef("dropped oldest message from channel for peer %s", peerID)
default:
log.WithContext(ctx).Tracef("channel unexpectedly empty while trying to drop for peer %s", peerID)
}
}
}
channel.Push(update)
} else {
log.WithContext(ctx).Debugf("peer %s has no channel", peerID)
}
}
// CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer.
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) chan *UpdateMessage {
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) *UpdateBuffer {
start := time.Now()
closed := false
@ -88,22 +75,21 @@ func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) c
if channel, ok := p.peerChannels[peerID]; ok {
closed = true
channel.Close()
delete(p.peerChannels, peerID)
close(channel)
}
// mbragin: todo shouldn't it be more? or configurable?
channel := make(chan *UpdateMessage, channelBufferSize)
p.peerChannels[peerID] = channel
buffer := NewUpdateBuffer()
log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID)
return channel
return buffer
}
func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) {
if channel, ok := p.peerChannels[peerID]; ok {
delete(p.peerChannels, peerID)
close(channel)
channel.Close()
log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID)
return