diff --git a/management/server/peer.go b/management/server/peer.go index e026c4c61..5174984c4 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -390,6 +390,11 @@ func (am *DefaultAccountManager) UpdatePeerSSHKey(peerKey string, sshKey string) return err } + if peer.SSHKey == sshKey { + log.Debugf("same SSH key provided for peer %s, skipping update", peerKey) + return nil + } + account, err := am.Store.GetPeerAccount(peerKey) if err != nil { return err diff --git a/management/server/turncredentials.go b/management/server/turncredentials.go index 380fe08fb..82ce0a329 100644 --- a/management/server/turncredentials.go +++ b/management/server/turncredentials.go @@ -85,15 +85,18 @@ func (m *TimeBasedAuthSecretsManager) SetupRefresh(peerKey string) { m.cancel(peerKey) cancel := make(chan struct{}, 1) m.cancelMap[peerKey] = cancel + log.Debugf("starting turn refresh for %s", peerKey) + go func() { + //we don't want to regenerate credentials right on expiration, so we do it slightly before (at 3/4 of TTL) + ticker := time.NewTicker(m.config.CredentialsTTL.Duration / 4 * 3) + for { select { case <-cancel: + log.Debugf("stopping turn refresh for %s", peerKey) return - default: - //we don't want to regenerate credentials right on expiration, so we do it slightly before (at 3/4 of TTL) - time.Sleep(m.config.CredentialsTTL.Duration / 4 * 3) - + case <-ticker.C: c := m.GenerateCredentials() var turns []*proto.ProtectedHostConfig for _, host := range m.config.Turns { diff --git a/management/server/updatechannel.go b/management/server/updatechannel.go index ba443c8c1..22f5291f3 100644 --- a/management/server/updatechannel.go +++ b/management/server/updatechannel.go @@ -6,6 +6,8 @@ import ( "sync" ) +const channelBufferSize = 100 + type UpdateMessage struct { Update *proto.SyncResponse } @@ -28,7 +30,12 @@ func (p *PeersUpdateManager) SendUpdate(peer string, update *UpdateMessage) erro p.channelsMux.Lock() defer p.channelsMux.Unlock() if channel, ok := p.peerChannels[peer]; ok { - channel <- update + select { + case channel <- update: + log.Infof("update was sent to channel for peer %s", peer) + default: + log.Warnf("channel for peer %s is %d full", peer, len(channel)) + } return nil } log.Debugf("peer %s has no channel", peer) @@ -45,7 +52,7 @@ func (p *PeersUpdateManager) CreateChannel(peerKey string) chan *UpdateMessage { close(channel) } //mbragin: todo shouldn't it be more? or configurable? - channel := make(chan *UpdateMessage, 100) + channel := make(chan *UpdateMessage, channelBufferSize) p.peerChannels[peerKey] = channel log.Debugf("opened updates channel for a peer %s", peerKey) diff --git a/management/server/updatechannel_test.go b/management/server/updatechannel_test.go index d0352e0fa..c37cd4228 100644 --- a/management/server/updatechannel_test.go +++ b/management/server/updatechannel_test.go @@ -3,13 +3,14 @@ package server import ( "github.com/netbirdio/netbird/management/proto" "testing" + "time" ) -var peersUpdater *PeersUpdateManager +//var peersUpdater *PeersUpdateManager func TestCreateChannel(t *testing.T) { peer := "test-create" - peersUpdater = NewPeersUpdateManager() + peersUpdater := NewPeersUpdateManager() defer peersUpdater.CloseChannel(peer) _ = peersUpdater.CreateChannel(peer) @@ -20,12 +21,17 @@ func TestCreateChannel(t *testing.T) { func TestSendUpdate(t *testing.T) { peer := "test-sendupdate" - update := &UpdateMessage{Update: &proto.SyncResponse{}} + peersUpdater := NewPeersUpdateManager() + update1 := &UpdateMessage{Update: &proto.SyncResponse{ + NetworkMap: &proto.NetworkMap{ + Serial: 0, + }, + }} _ = peersUpdater.CreateChannel(peer) if _, ok := peersUpdater.peerChannels[peer]; !ok { t.Error("Error creating the channel") } - err := peersUpdater.SendUpdate(peer, update) + err := peersUpdater.SendUpdate(peer, update1) if err != nil { t.Error("Error sending update: ", err) } @@ -34,10 +40,41 @@ func TestSendUpdate(t *testing.T) { default: t.Error("Update wasn't send") } + + for range [channelBufferSize]int{} { + err = peersUpdater.SendUpdate(peer, update1) + if err != nil { + t.Errorf("got an early error sending update: %v ", err) + } + } + + update2 := &UpdateMessage{Update: &proto.SyncResponse{ + NetworkMap: &proto.NetworkMap{ + Serial: 10, + }, + }} + + err = peersUpdater.SendUpdate(peer, update2) + if err != nil { + t.Error("update shouldn't return an error when channel buffer is full") + } + timeout := time.After(5 * time.Second) + for range [channelBufferSize]int{} { + select { + case <-timeout: + t.Error("timed out reading previously sent updates") + case updateReader := <-peersUpdater.peerChannels[peer]: + if updateReader.Update.NetworkMap.Serial == update2.Update.NetworkMap.Serial { + t.Error("got the update that shouldn't have been sent") + } + } + } + } func TestCloseChannel(t *testing.T) { peer := "test-close" + peersUpdater := NewPeersUpdateManager() _ = peersUpdater.CreateChannel(peer) if _, ok := peersUpdater.peerChannels[peer]; !ok { t.Error("Error creating the channel")