[management] Add more logs to the peer update processes (#2881)

This commit is contained in:
Pascal Fischer 2024-11-12 14:19:22 +01:00 committed by GitHub
parent 6cb697eed6
commit 20a5afc359
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 41 additions and 16 deletions

View File

@ -2129,7 +2129,7 @@ func (am *DefaultAccountManager) syncJWTGroups(ctx context.Context, accountID st
if settings.GroupsPropagationEnabled { if settings.GroupsPropagationEnabled {
account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID) account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
if err != nil { if err != nil {
return fmt.Errorf("error getting account: %w", err) return status.NewGetAccountError(err)
} }
if areGroupChangesAffectPeers(account, addNewGroups) || areGroupChangesAffectPeers(account, removeOldGroups) { if areGroupChangesAffectPeers(account, addNewGroups) || areGroupChangesAffectPeers(account, removeOldGroups) {
@ -2290,12 +2290,12 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID
account, err := am.Store.GetAccount(ctx, accountID) account, err := am.Store.GetAccount(ctx, accountID)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, status.NewGetAccountError(err)
} }
peer, netMap, postureChecks, err := am.SyncPeer(ctx, PeerSync{WireGuardPubKey: peerPubKey, Meta: meta}, account) peer, netMap, postureChecks, err := am.SyncPeer(ctx, PeerSync{WireGuardPubKey: peerPubKey, Meta: meta}, account)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, fmt.Errorf("error syncing peer: %w", err)
} }
err = am.MarkPeerConnected(ctx, peerPubKey, true, realIP, account) err = am.MarkPeerConnected(ctx, peerPubKey, true, realIP, account)
@ -2314,7 +2314,7 @@ func (am *DefaultAccountManager) OnPeerDisconnected(ctx context.Context, account
account, err := am.Store.GetAccount(ctx, accountID) account, err := am.Store.GetAccount(ctx, accountID)
if err != nil { if err != nil {
return err return status.NewGetAccountError(err)
} }
err = am.MarkPeerConnected(ctx, peerPubKey, false, nil, account) err = am.MarkPeerConnected(ctx, peerPubKey, false, nil, account)

View File

@ -180,6 +180,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), extractPeerMeta(ctx, syncReq.GetMeta()), realIP) peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), extractPeerMeta(ctx, syncReq.GetMeta()), realIP)
if err != nil { if err != nil {
log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err)
return mapError(ctx, err) return mapError(ctx, err)
} }
@ -207,6 +208,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
// handleUpdates sends updates to the connected peer until the updates channel is closed. // handleUpdates sends updates to the connected peer until the updates channel is closed.
func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *UpdateMessage, srv proto.ManagementService_SyncServer) error { func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *UpdateMessage, srv proto.ManagementService_SyncServer) error {
log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String())
for { for {
select { select {
// condition when there are some updates // condition when there are some updates
@ -260,10 +262,15 @@ func (s *GRPCServer) cancelPeerRoutines(ctx context.Context, accountID string, p
unlock := s.acquirePeerLockByUID(ctx, peer.Key) unlock := s.acquirePeerLockByUID(ctx, peer.Key)
defer unlock() defer unlock()
_ = s.accountManager.OnPeerDisconnected(ctx, accountID, peer.Key) err := s.accountManager.OnPeerDisconnected(ctx, accountID, peer.Key)
if err != nil {
log.WithContext(ctx).Errorf("failed to disconnect peer %s properly: %v", peer.Key, err)
}
s.peersUpdateManager.CloseChannel(ctx, peer.ID) s.peersUpdateManager.CloseChannel(ctx, peer.ID)
s.secretsManager.CancelRefresh(peer.ID) s.secretsManager.CancelRefresh(peer.ID)
s.ephemeralManager.OnPeerDisconnected(ctx, peer) s.ephemeralManager.OnPeerDisconnected(ctx, peer)
log.WithContext(ctx).Tracef("peer %s has been disconnected", peer.Key)
} }
func (s *GRPCServer) validateToken(ctx context.Context, jwtToken string) (string, error) { func (s *GRPCServer) validateToken(ctx context.Context, jwtToken string) (string, error) {

View File

@ -110,14 +110,16 @@ func (am *DefaultAccountManager) GetPeers(ctx context.Context, accountID, userID
func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubKey string, connected bool, realIP net.IP, account *Account) error { func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubKey string, connected bool, realIP net.IP, account *Account) error {
peer, err := account.FindPeerByPubKey(peerPubKey) peer, err := account.FindPeerByPubKey(peerPubKey)
if err != nil { if err != nil {
return err return fmt.Errorf("failed to find peer by pub key: %w", err)
} }
expired, err := am.updatePeerStatusAndLocation(ctx, peer, connected, realIP, account) expired, err := am.updatePeerStatusAndLocation(ctx, peer, connected, realIP, account)
if err != nil { if err != nil {
return err return fmt.Errorf("failed to update peer status and location: %w", err)
} }
log.WithContext(ctx).Debugf("mark peer %s connected: %t", peer.ID, connected)
if peer.AddedWithSSOLogin() { if peer.AddedWithSSOLogin() {
if peer.LoginExpirationEnabled && account.Settings.PeerLoginExpirationEnabled { if peer.LoginExpirationEnabled && account.Settings.PeerLoginExpirationEnabled {
am.checkAndSchedulePeerLoginExpiration(ctx, account) am.checkAndSchedulePeerLoginExpiration(ctx, account)
@ -168,7 +170,7 @@ func (am *DefaultAccountManager) updatePeerStatusAndLocation(ctx context.Context
err := am.Store.SavePeerStatus(account.Id, peer.ID, *newStatus) err := am.Store.SavePeerStatus(account.Id, peer.ID, *newStatus)
if err != nil { if err != nil {
return false, err return false, fmt.Errorf("failed to save peer status: %w", err)
} }
return oldStatus.LoginExpired, nil return oldStatus.LoginExpired, nil
@ -587,7 +589,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID) account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
if err != nil { if err != nil {
return nil, nil, nil, fmt.Errorf("error getting account: %w", err) return nil, nil, nil, status.NewGetAccountError(err)
} }
allGroup, err := account.GetGroupAll() allGroup, err := account.GetGroupAll()
@ -640,7 +642,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync PeerSync, ac
if peer.UserID != "" { if peer.UserID != "" {
user, err := account.FindUser(peer.UserID) user, err := account.FindUser(peer.UserID)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, fmt.Errorf("failed to get user: %w", err)
} }
err = checkIfPeerOwnerIsBlocked(peer, user) err = checkIfPeerOwnerIsBlocked(peer, user)
@ -657,7 +659,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync PeerSync, ac
if updated { if updated {
err = am.Store.SavePeer(ctx, account.Id, peer) err = am.Store.SavePeer(ctx, account.Id, peer)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, fmt.Errorf("failed to save peer: %w", err)
} }
if sync.UpdateAccountPeers { if sync.UpdateAccountPeers {
@ -667,7 +669,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync PeerSync, ac
peerNotValid, isStatusChanged, err := am.integratedPeerValidator.IsNotValidPeer(ctx, account.Id, peer, account.GetPeerGroupsList(peer.ID), account.Settings.Extra) peerNotValid, isStatusChanged, err := am.integratedPeerValidator.IsNotValidPeer(ctx, account.Id, peer, account.GetPeerGroupsList(peer.ID), account.Settings.Extra)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, fmt.Errorf("failed to validate peer: %w", err)
} }
var postureChecks []*posture.Checks var postureChecks []*posture.Checks
@ -685,7 +687,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync PeerSync, ac
validPeersMap, err := am.GetValidatedPeers(account) validPeersMap, err := am.GetValidatedPeers(account)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, fmt.Errorf("failed to get validated peers: %w", err)
} }
postureChecks = am.getPeerPostureChecks(account, peer) postureChecks = am.getPeerPostureChecks(account, peer)

View File

@ -135,3 +135,8 @@ func NewStoreContextCanceledError(duration time.Duration) error {
func NewInvalidKeyIDError() error { func NewInvalidKeyIDError() error {
return Errorf(InvalidArgument, "invalid key ID") return Errorf(InvalidArgument, "invalid key ID")
} }
// NewGetAccountError creates a new Error with Internal type for an issue getting account
func NewGetAccountError(err error) error {
return Errorf(Internal, "error getting account: %s", err)
}

View File

@ -96,9 +96,12 @@ func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) {
if channel, ok := p.peerChannels[peerID]; ok { if channel, ok := p.peerChannels[peerID]; ok {
delete(p.peerChannels, peerID) delete(p.peerChannels, peerID)
close(channel) close(channel)
}
log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID) log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID)
return
}
log.WithContext(ctx).Debugf("closing updates channel: peer %s has no channel", peerID)
} }
// CloseChannels closes updates channel for each given peer // CloseChannels closes updates channel for each given peer

View File

@ -9,14 +9,16 @@ import (
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/management/server/activity" "github.com/netbirdio/netbird/management/server/activity"
nbContext "github.com/netbirdio/netbird/management/server/context"
nbgroup "github.com/netbirdio/netbird/management/server/group" nbgroup "github.com/netbirdio/netbird/management/server/group"
"github.com/netbirdio/netbird/management/server/idp" "github.com/netbirdio/netbird/management/server/idp"
"github.com/netbirdio/netbird/management/server/integration_reference" "github.com/netbirdio/netbird/management/server/integration_reference"
"github.com/netbirdio/netbird/management/server/jwtclaims" "github.com/netbirdio/netbird/management/server/jwtclaims"
nbpeer "github.com/netbirdio/netbird/management/server/peer" nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/status" "github.com/netbirdio/netbird/management/server/status"
log "github.com/sirupsen/logrus"
) )
const ( const (
@ -1105,6 +1107,9 @@ func (am *DefaultAccountManager) GetUsersFromAccount(ctx context.Context, accoun
func (am *DefaultAccountManager) expireAndUpdatePeers(ctx context.Context, account *Account, peers []*nbpeer.Peer) error { func (am *DefaultAccountManager) expireAndUpdatePeers(ctx context.Context, account *Account, peers []*nbpeer.Peer) error {
var peerIDs []string var peerIDs []string
for _, peer := range peers { for _, peer := range peers {
// nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.PeerIDKey, peer.Key)
if peer.Status.LoginExpired { if peer.Status.LoginExpired {
continue continue
} }
@ -1112,8 +1117,11 @@ func (am *DefaultAccountManager) expireAndUpdatePeers(ctx context.Context, accou
peer.MarkLoginExpired(true) peer.MarkLoginExpired(true)
account.UpdatePeer(peer) account.UpdatePeer(peer)
if err := am.Store.SavePeerStatus(account.Id, peer.ID, *peer.Status); err != nil { if err := am.Store.SavePeerStatus(account.Id, peer.ID, *peer.Status); err != nil {
return err return fmt.Errorf("failed saving peer status for peer %s: %s", peer.ID, err)
} }
log.WithContext(ctx).Tracef("mark peer %s login expired", peer.ID)
am.StoreEvent( am.StoreEvent(
ctx, ctx,
peer.UserID, peer.ID, account.Id, peer.UserID, peer.ID, account.Id,