mirror of
https://github.com/netbirdio/netbird.git
synced 2025-04-09 17:50:15 +02:00
Move write peer lock (#2364)
Moved the write peer lock to avoid latency caused by disk access Updated the method CancelPeerRoutines to use the peer public key
This commit is contained in:
parent
c832cef44c
commit
5ee9c77e90
@ -136,7 +136,7 @@ type AccountManager interface {
|
|||||||
GroupValidation(ctx context.Context, accountId string, groups []string) (bool, error)
|
GroupValidation(ctx context.Context, accountId string, groups []string) (bool, error)
|
||||||
GetValidatedPeers(account *Account) (map[string]struct{}, error)
|
GetValidatedPeers(account *Account) (map[string]struct{}, error)
|
||||||
SyncAndMarkPeer(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *NetworkMap, []*posture.Checks, error)
|
SyncAndMarkPeer(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *NetworkMap, []*posture.Checks, error)
|
||||||
CancelPeerRoutines(ctx context.Context, peer *nbpeer.Peer) error
|
CancelPeerRoutines(ctx context.Context, peerPubKey string) error
|
||||||
SyncPeerMeta(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta) error
|
SyncPeerMeta(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta) error
|
||||||
FindExistingPostureCheck(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error)
|
FindExistingPostureCheck(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error)
|
||||||
GetAccountIDForPeerKey(ctx context.Context, peerKey string) (string, error)
|
GetAccountIDForPeerKey(ctx context.Context, peerKey string) (string, error)
|
||||||
@ -1858,6 +1858,11 @@ func (am *DefaultAccountManager) getAccountWithAuthorizationClaims(ctx context.C
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *NetworkMap, []*posture.Checks, error) {
|
func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *NetworkMap, []*posture.Checks, error) {
|
||||||
|
// acquiring peer write lock here is ok since we only modify peer information that is supplied by the
|
||||||
|
// peer itself which can't be modified by API, and it only happens after an account read lock is acquired
|
||||||
|
peerUnlock := am.Store.AcquireWriteLockByUID(ctx, peerPubKey)
|
||||||
|
defer peerUnlock()
|
||||||
|
|
||||||
accountID, err := am.Store.GetAccountIDByPeerPubKey(ctx, peerPubKey)
|
accountID, err := am.Store.GetAccountIDByPeerPubKey(ctx, peerPubKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errStatus, ok := status.FromError(err); ok && errStatus.Type() == status.NotFound {
|
if errStatus, ok := status.FromError(err); ok && errStatus.Type() == status.NotFound {
|
||||||
@ -1868,8 +1873,6 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, peerPubKey
|
|||||||
|
|
||||||
accountUnlock := am.Store.AcquireReadLockByUID(ctx, accountID)
|
accountUnlock := am.Store.AcquireReadLockByUID(ctx, accountID)
|
||||||
defer accountUnlock()
|
defer accountUnlock()
|
||||||
peerUnlock := am.Store.AcquireWriteLockByUID(ctx, peerPubKey)
|
|
||||||
defer peerUnlock()
|
|
||||||
|
|
||||||
account, err := am.Store.GetAccount(ctx, accountID)
|
account, err := am.Store.GetAccount(ctx, accountID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1889,8 +1892,13 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, peerPubKey
|
|||||||
return peer, netMap, postureChecks, nil
|
return peer, netMap, postureChecks, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (am *DefaultAccountManager) CancelPeerRoutines(ctx context.Context, peer *nbpeer.Peer) error {
|
func (am *DefaultAccountManager) CancelPeerRoutines(ctx context.Context, peerPubKey string) error {
|
||||||
accountID, err := am.Store.GetAccountIDByPeerPubKey(ctx, peer.Key)
|
// acquiring peer write lock here is ok since we only modify peer information that is supplied by the
|
||||||
|
// peer itself which can't be modified by API, and it only happens after an account read lock is acquired
|
||||||
|
peerUnlock := am.Store.AcquireWriteLockByUID(ctx, peerPubKey)
|
||||||
|
defer peerUnlock()
|
||||||
|
|
||||||
|
accountID, err := am.Store.GetAccountIDByPeerPubKey(ctx, peerPubKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errStatus, ok := status.FromError(err); ok && errStatus.Type() == status.NotFound {
|
if errStatus, ok := status.FromError(err); ok && errStatus.Type() == status.NotFound {
|
||||||
return status.Errorf(status.Unauthenticated, "peer not registered")
|
return status.Errorf(status.Unauthenticated, "peer not registered")
|
||||||
@ -1900,17 +1908,15 @@ func (am *DefaultAccountManager) CancelPeerRoutines(ctx context.Context, peer *n
|
|||||||
|
|
||||||
accountUnlock := am.Store.AcquireReadLockByUID(ctx, accountID)
|
accountUnlock := am.Store.AcquireReadLockByUID(ctx, accountID)
|
||||||
defer accountUnlock()
|
defer accountUnlock()
|
||||||
peerUnlock := am.Store.AcquireWriteLockByUID(ctx, peer.Key)
|
|
||||||
defer peerUnlock()
|
|
||||||
|
|
||||||
account, err := am.Store.GetAccount(ctx, accountID)
|
account, err := am.Store.GetAccount(ctx, accountID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = am.MarkPeerConnected(ctx, peer.Key, false, nil, account)
|
err = am.MarkPeerConnected(ctx, peerPubKey, false, nil, account)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithContext(ctx).Warnf("failed marking peer as connected %s %v", peer.Key, err)
|
log.WithContext(ctx).Warnf("failed marking peer as connected %s %v", peerPubKey, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -236,7 +236,7 @@ func (s *GRPCServer) sendUpdate(ctx context.Context, peerKey wgtypes.Key, peer *
|
|||||||
func (s *GRPCServer) cancelPeerRoutines(ctx context.Context, peer *nbpeer.Peer) {
|
func (s *GRPCServer) cancelPeerRoutines(ctx context.Context, peer *nbpeer.Peer) {
|
||||||
s.peersUpdateManager.CloseChannel(ctx, peer.ID)
|
s.peersUpdateManager.CloseChannel(ctx, peer.ID)
|
||||||
s.turnCredentialsManager.CancelRefresh(peer.ID)
|
s.turnCredentialsManager.CancelRefresh(peer.ID)
|
||||||
_ = s.accountManager.CancelPeerRoutines(ctx, peer)
|
_ = s.accountManager.CancelPeerRoutines(ctx, peer.Key)
|
||||||
s.ephemeralManager.OnPeerDisconnected(ctx, peer)
|
s.ephemeralManager.OnPeerDisconnected(ctx, peer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,7 +112,7 @@ func (am *MockAccountManager) SyncAndMarkPeer(ctx context.Context, peerPubKey st
|
|||||||
return nil, nil, nil, status.Errorf(codes.Unimplemented, "method MarkPeerConnected is not implemented")
|
return nil, nil, nil, status.Errorf(codes.Unimplemented, "method MarkPeerConnected is not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (am *MockAccountManager) CancelPeerRoutines(_ context.Context, peer *nbpeer.Peer) error {
|
func (am *MockAccountManager) CancelPeerRoutines(_ context.Context, peerPubKey string) error {
|
||||||
// TODO implement me
|
// TODO implement me
|
||||||
panic("implement me")
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user