chore: [management] - replace proactive peer updates with periodic updates

This commit is contained in:
braginini
2022-01-01 14:47:57 +01:00
parent 828410b34c
commit 09eeb71af2
3 changed files with 46 additions and 12 deletions

View File

@ -3,6 +3,7 @@ package server
import ( import (
"context" "context"
"fmt" "fmt"
"math/rand"
"time" "time"
"github.com/golang/protobuf/ptypes/timestamp" "github.com/golang/protobuf/ptypes/timestamp"
@ -95,6 +96,8 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
if s.config.TURNConfig.TimeBasedCredentials { if s.config.TURNConfig.TimeBasedCredentials {
s.turnCredentialsManager.SetupRefresh(peerKey.String()) s.turnCredentialsManager.SetupRefresh(peerKey.String())
} }
s.schedulePeerUpdates(srv.Context(), peerKey.String(), peer)
// keep a connection to the peer and send updates when available // keep a connection to the peer and send updates when available
for { for {
select { select {
@ -135,6 +138,36 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
} }
} }
func (s *Server) schedulePeerUpdates(context context.Context, peerKey string, peer *Peer) {
//todo: introduce the following logic:
// add a ModificationId to the Account entity (ModificationId increments by 1 if there was a change to the account network map)
// periodically fetch changes of the Account providing ModificationId
// if ModificationId is < then the one of the Account, then send changes
// Client has to handle modification id as well
go func() {
for {
select {
case <-context.Done():
log.Debugf("peer update cancelled %s", peerKey)
return
default:
maxSleep := 6
minSleep := 3
sleep := rand.Intn(maxSleep-minSleep) + minSleep
time.Sleep(time.Duration(sleep) * time.Second)
peers, err := s.accountManager.GetPeersForAPeer(peerKey)
if err != nil {
continue
}
update := toSyncResponse(s.config, peer, peers, nil)
err = s.peersUpdateManager.SendUpdate(peerKey, &UpdateMessage{Update: update})
}
}
}()
}
func (s *Server) registerPeer(peerKey wgtypes.Key, req *proto.LoginRequest) (*Peer, error) { func (s *Server) registerPeer(peerKey wgtypes.Key, req *proto.LoginRequest) (*Peer, error) {
meta := req.GetMeta() meta := req.GetMeta()
@ -158,12 +191,13 @@ func (s *Server) registerPeer(peerKey wgtypes.Key, req *proto.LoginRequest) (*Pe
return nil, status.Errorf(codes.NotFound, "provided setup key doesn't exists") return nil, status.Errorf(codes.NotFound, "provided setup key doesn't exists")
} }
peers, err := s.accountManager.GetPeersForAPeer(peer.Key) // notify other peers of our registration - uncomment if you want to bring back peer update logic
/*peers, err := s.accountManager.GetPeersForAPeer(peer.Key)
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, "internal server error") return nil, status.Error(codes.Internal, "internal server error")
} }
// notify other peers of our registration
for _, remotePeer := range peers { for _, remotePeer := range peers {
// exclude notified peer and add ourselves // exclude notified peer and add ourselves
peersToSend := []*Peer{peer} peersToSend := []*Peer{peer}
@ -178,7 +212,7 @@ func (s *Server) registerPeer(peerKey wgtypes.Key, req *proto.LoginRequest) (*Pe
// todo rethink if we should keep this return // todo rethink if we should keep this return
return nil, err return nil, err
} }
} }*/
return peer, nil return peer, nil
} }

View File

@ -321,9 +321,9 @@ var _ = Describe("Management service", func() {
}) })
}) })
Context("when there are 50 peers registered under one account", func() { Context("when there are 30 peers registered under one account", func() {
Context("when there are 10 more peers registered under the same account", func() { Context("when there are 10 more peers registered under the same account", func() {
Specify("all of the 50 peers will get updates of 10 newly registered peers", func() { Specify("all of the 20 peers will have 29 peer to connect to (total 30-1 itself)", func() {
initialPeers := 20 initialPeers := 20
additionalPeers := 10 additionalPeers := 10
@ -336,7 +336,7 @@ var _ = Describe("Management service", func() {
} }
wg := sync2.WaitGroup{} wg := sync2.WaitGroup{}
wg.Add(initialPeers + initialPeers*additionalPeers) wg.Add(initialPeers)
var clients []mgmtProto.ManagementService_SyncClient var clients []mgmtProto.ManagementService_SyncClient
for _, peer := range peers { for _, peer := range peers {
@ -368,9 +368,10 @@ var _ = Describe("Management service", func() {
resp := &mgmtProto.SyncResponse{} resp := &mgmtProto.SyncResponse{}
err = pb.Unmarshal(decryptedBytes, resp) err = pb.Unmarshal(decryptedBytes, resp)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if len(resp.GetRemotePeers()) > 0 { if len(resp.GetRemotePeers()) == 29 {
//only consider peer updates //only consider peer updates
wg.Done() wg.Done()
return
} }
} }
}() }()

View File

@ -1,7 +1,6 @@
package server package server
import ( import (
"github.com/wiretrustee/wiretrustee/management/proto"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"net" "net"
@ -123,7 +122,7 @@ func (am *AccountManager) DeletePeer(accountId string, peerKey string) (*Peer, e
return nil, err return nil, err
} }
err = am.peersUpdateManager.SendUpdate(peerKey, /*err = am.peersUpdateManager.SendUpdate(peerKey,
&UpdateMessage{ &UpdateMessage{
Update: &proto.SyncResponse{ Update: &proto.SyncResponse{
RemotePeers: []*proto.RemotePeerConfig{}, RemotePeers: []*proto.RemotePeerConfig{},
@ -131,7 +130,7 @@ func (am *AccountManager) DeletePeer(accountId string, peerKey string) (*Peer, e
}}) }})
if err != nil { if err != nil {
return nil, err return nil, err
} }*/
//notify other peers of the change //notify other peers of the change
peers, err := am.Store.GetAccountPeers(accountId) peers, err := am.Store.GetAccountPeers(accountId)
@ -146,7 +145,7 @@ func (am *AccountManager) DeletePeer(accountId string, peerKey string) (*Peer, e
peersToSend = append(peersToSend, remote) peersToSend = append(peersToSend, remote)
} }
} }
update := toRemotePeerConfig(peersToSend) /*update := toRemotePeerConfig(peersToSend)
err = am.peersUpdateManager.SendUpdate(p.Key, err = am.peersUpdateManager.SendUpdate(p.Key,
&UpdateMessage{ &UpdateMessage{
Update: &proto.SyncResponse{ Update: &proto.SyncResponse{
@ -155,7 +154,7 @@ func (am *AccountManager) DeletePeer(accountId string, peerKey string) (*Peer, e
}}) }})
if err != nil { if err != nil {
return nil, err return nil, err
} }*/
} }
am.peersUpdateManager.CloseChannel(peerKey) am.peersUpdateManager.CloseChannel(peerKey)