mirror of
https://github.com/netbirdio/netbird.git
synced 2025-08-27 14:26:01 +02:00
Move Login business logic from gRPC API to Accountmanager (#713)
The Management gRPC API has too much business logic happening while it has to be in the Account manager. This also needs to make more requests to the store through the account manager.
This commit is contained in:
@@ -3,6 +3,7 @@ package server
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
pb "github.com/golang/protobuf/proto" //nolint
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -118,44 +119,18 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
|
||||
log.Debugf("Sync request from peer [%s] [%s]", req.WgPubKey, p.Addr.String())
|
||||
}
|
||||
|
||||
peerKey, err := wgtypes.ParseKey(req.GetWgPubKey())
|
||||
if err != nil {
|
||||
log.Warnf("error while parsing peer's Wireguard public key %s on Sync request.", peerKey.String())
|
||||
return status.Errorf(codes.InvalidArgument, "provided wgPubKey %s is invalid", peerKey.String())
|
||||
}
|
||||
|
||||
peer, err := s.accountManager.GetPeerByKey(peerKey.String())
|
||||
if err != nil {
|
||||
p, _ := gRPCPeer.FromContext(srv.Context())
|
||||
msg := status.Errorf(codes.PermissionDenied, "provided peer with the key wgPubKey %s is not registered, remote addr is %s", peerKey.String(), p.Addr.String())
|
||||
log.Debug(msg)
|
||||
return msg
|
||||
}
|
||||
|
||||
account, err := s.accountManager.GetAccountByPeerID(peer.ID)
|
||||
if err != nil {
|
||||
return status.Error(codes.Internal, "internal server error")
|
||||
}
|
||||
expired, left := peer.LoginExpired(account.Settings.PeerLoginExpiration)
|
||||
expired = account.Settings.PeerLoginExpirationEnabled && expired
|
||||
if peer.UserID != "" && (expired || peer.Status.LoginExpired) {
|
||||
err = s.accountManager.MarkPeerLoginExpired(peerKey.String(), true)
|
||||
if err != nil {
|
||||
log.Warnf("failed marking peer login expired %s %v", peerKey, err)
|
||||
}
|
||||
return status.Errorf(codes.PermissionDenied, "peer login has expired %v ago. Please log in once more", left)
|
||||
}
|
||||
|
||||
syncReq := &proto.SyncRequest{}
|
||||
err = encryption.DecryptMessage(peerKey, s.wgKey, req.Body, syncReq)
|
||||
peerKey, err := s.parseRequest(req, syncReq)
|
||||
if err != nil {
|
||||
p, _ := gRPCPeer.FromContext(srv.Context())
|
||||
msg := status.Errorf(codes.InvalidArgument, "invalid request message from %s,remote addr is %s", peerKey.String(), p.Addr.String())
|
||||
log.Debug(msg)
|
||||
return msg
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.sendInitialSync(peerKey, peer, srv)
|
||||
peer, netMap, err := s.accountManager.SyncPeer(PeerSync{WireGuardPubKey: peerKey.String()})
|
||||
if err != nil {
|
||||
return mapError(err)
|
||||
}
|
||||
|
||||
err = s.sendInitialSync(peerKey, peer, netMap, srv)
|
||||
if err != nil {
|
||||
log.Debugf("error while sending initial sync for %s: %v", peerKey.String(), err)
|
||||
return err
|
||||
@@ -218,7 +193,7 @@ func (s *GRPCServer) validateToken(jwtToken string) (string, error) {
|
||||
|
||||
token, err := s.jwtMiddleware.ValidateAndParse(jwtToken)
|
||||
if err != nil {
|
||||
return "", status.Errorf(codes.Internal, "invalid jwt token, err: %v", err)
|
||||
return "", status.Errorf(codes.InvalidArgument, "invalid jwt token, err: %v", err)
|
||||
}
|
||||
claims := s.jwtClaimsExtractor.FromToken(token)
|
||||
// we need to call this method because if user is new, we will automatically add it to existing or create a new account
|
||||
@@ -230,84 +205,52 @@ func (s *GRPCServer) validateToken(jwtToken string) (string, error) {
|
||||
return claims.UserId, nil
|
||||
}
|
||||
|
||||
func (s *GRPCServer) registerPeer(peerKey wgtypes.Key, req *proto.LoginRequest) (*Peer, error) {
|
||||
var (
|
||||
reqSetupKey string
|
||||
userID string
|
||||
err error
|
||||
)
|
||||
|
||||
if req.GetJwtToken() != "" {
|
||||
log.Debugln("using jwt token to register peer")
|
||||
userID, err = s.validateToken(req.JwtToken)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// maps internal internalStatus.Error to gRPC status.Error
|
||||
func mapError(err error) error {
|
||||
if e, ok := internalStatus.FromError(err); ok {
|
||||
switch e.Type() {
|
||||
case internalStatus.PermissionDenied:
|
||||
return status.Errorf(codes.PermissionDenied, e.Message)
|
||||
case internalStatus.Unauthorized:
|
||||
return status.Errorf(codes.PermissionDenied, e.Message)
|
||||
case internalStatus.Unauthenticated:
|
||||
return status.Errorf(codes.PermissionDenied, e.Message)
|
||||
case internalStatus.PreconditionFailed:
|
||||
return status.Errorf(codes.FailedPrecondition, e.Message)
|
||||
case internalStatus.NotFound:
|
||||
return status.Errorf(codes.NotFound, e.Message)
|
||||
default:
|
||||
}
|
||||
} else {
|
||||
log.Debugln("using setup key to register peer")
|
||||
reqSetupKey = req.GetSetupKey()
|
||||
userID = ""
|
||||
}
|
||||
return status.Errorf(codes.Internal, "failed handling request")
|
||||
}
|
||||
|
||||
meta := req.GetMeta()
|
||||
if meta == nil {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "peer meta data was not provided")
|
||||
func extractPeerMeta(loginReq *proto.LoginRequest) PeerSystemMeta {
|
||||
return PeerSystemMeta{
|
||||
Hostname: loginReq.GetMeta().GetHostname(),
|
||||
GoOS: loginReq.GetMeta().GetGoOS(),
|
||||
Kernel: loginReq.GetMeta().GetKernel(),
|
||||
Core: loginReq.GetMeta().GetCore(),
|
||||
Platform: loginReq.GetMeta().GetPlatform(),
|
||||
OS: loginReq.GetMeta().GetOS(),
|
||||
WtVersion: loginReq.GetMeta().GetWiretrusteeVersion(),
|
||||
UIVersion: loginReq.GetMeta().GetUiVersion(),
|
||||
}
|
||||
}
|
||||
|
||||
var sshKey []byte
|
||||
if req.GetPeerKeys() != nil {
|
||||
sshKey = req.GetPeerKeys().GetSshPubKey()
|
||||
}
|
||||
|
||||
peer, err := s.accountManager.AddPeer(reqSetupKey, userID, &Peer{
|
||||
Key: peerKey.String(),
|
||||
Name: meta.GetHostname(),
|
||||
SSHKey: string(sshKey),
|
||||
Meta: PeerSystemMeta{
|
||||
Hostname: meta.GetHostname(),
|
||||
GoOS: meta.GetGoOS(),
|
||||
Kernel: meta.GetKernel(),
|
||||
Core: meta.GetCore(),
|
||||
Platform: meta.GetPlatform(),
|
||||
OS: meta.GetOS(),
|
||||
WtVersion: meta.GetWiretrusteeVersion(),
|
||||
UIVersion: meta.GetUiVersion(),
|
||||
},
|
||||
})
|
||||
func (s *GRPCServer) parseRequest(req *proto.EncryptedMessage, parsed pb.Message) (wgtypes.Key, error) {
|
||||
peerKey, err := wgtypes.ParseKey(req.GetWgPubKey())
|
||||
if err != nil {
|
||||
if e, ok := internalStatus.FromError(err); ok {
|
||||
switch e.Type() {
|
||||
case internalStatus.PreconditionFailed:
|
||||
return nil, status.Errorf(codes.FailedPrecondition, e.Message)
|
||||
case internalStatus.NotFound:
|
||||
return nil, status.Errorf(codes.NotFound, e.Message)
|
||||
default:
|
||||
}
|
||||
}
|
||||
return nil, status.Errorf(codes.Internal, "failed registering new peer")
|
||||
log.Warnf("error while parsing peer's WireGuard public key %s.", req.WgPubKey)
|
||||
return wgtypes.Key{}, status.Errorf(codes.InvalidArgument, "provided wgPubKey %s is invalid", req.WgPubKey)
|
||||
}
|
||||
|
||||
// todo move to DefaultAccountManager the code below
|
||||
networkMap, err := s.accountManager.GetNetworkMap(peer.ID)
|
||||
err = encryption.DecryptMessage(peerKey, s.wgKey, req.Body, parsed)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "unable to fetch network map after registering peer, error: %v", err)
|
||||
}
|
||||
// notify other peers of our registration
|
||||
for _, remotePeer := range networkMap.Peers {
|
||||
remotePeerNetworkMap, err := s.accountManager.GetNetworkMap(remotePeer.ID)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "unable to fetch network map after registering peer, error: %v", err)
|
||||
}
|
||||
|
||||
update := toSyncResponse(s.config, remotePeer, nil, remotePeerNetworkMap, s.accountManager.GetDNSDomain())
|
||||
err = s.peersUpdateManager.SendUpdate(remotePeer.ID, &UpdateMessage{Update: update})
|
||||
if err != nil {
|
||||
// todo rethink if we should keep this return
|
||||
return nil, status.Errorf(codes.Internal, "unable to send update after registering peer, error: %v", err)
|
||||
}
|
||||
return wgtypes.Key{}, status.Errorf(codes.InvalidArgument, "invalid request message")
|
||||
}
|
||||
|
||||
return peer, nil
|
||||
return peerKey, nil
|
||||
}
|
||||
|
||||
// Login endpoint first checks whether peer is registered under any account
|
||||
@@ -323,113 +266,55 @@ func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*p
|
||||
log.Debugf("Login request from peer [%s] [%s]", req.WgPubKey, p.Addr.String())
|
||||
}
|
||||
|
||||
peerKey, err := wgtypes.ParseKey(req.GetWgPubKey())
|
||||
if err != nil {
|
||||
log.Warnf("error while parsing peer's Wireguard public key %s on Sync request.", req.WgPubKey)
|
||||
return nil, status.Errorf(codes.InvalidArgument, "provided wgPubKey %s is invalid", req.WgPubKey)
|
||||
}
|
||||
|
||||
loginReq := &proto.LoginRequest{}
|
||||
err = encryption.DecryptMessage(peerKey, s.wgKey, req.Body, loginReq)
|
||||
peerKey, err := s.parseRequest(req, loginReq)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "invalid request message")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
peer, err := s.accountManager.GetPeerByKey(peerKey.String())
|
||||
if err != nil {
|
||||
if errStatus, ok := internalStatus.FromError(err); ok && errStatus.Type() == internalStatus.NotFound {
|
||||
// peer doesn't exist -> check if setup key was provided
|
||||
if loginReq.GetJwtToken() == "" && loginReq.GetSetupKey() == "" {
|
||||
// absent setup key or jwt -> permission denied
|
||||
p, _ := gRPCPeer.FromContext(ctx)
|
||||
msg := status.Errorf(codes.PermissionDenied,
|
||||
"provided peer with the key wgPubKey %s is not registered and no setup key or jwt was provided,"+
|
||||
" remote addr is %s", peerKey.String(), p.Addr.String())
|
||||
log.Debug(msg)
|
||||
return nil, msg
|
||||
}
|
||||
if loginReq.GetMeta() == nil {
|
||||
msg := status.Errorf(codes.FailedPrecondition,
|
||||
"peer system meta has to be provided to log in. Peer %s, remote addr %s", peerKey.String(),
|
||||
p.Addr.String())
|
||||
log.Warn(msg)
|
||||
return nil, msg
|
||||
}
|
||||
|
||||
// setup key or jwt is present -> try normal registration flow
|
||||
peer, err = s.registerPeer(peerKey, loginReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
} else {
|
||||
return nil, status.Error(codes.Internal, "internal server error")
|
||||
}
|
||||
} else if loginReq.GetMeta() != nil {
|
||||
// update peer's system meta data on Login
|
||||
err = s.accountManager.UpdatePeerMeta(peer.ID, PeerSystemMeta{
|
||||
Hostname: loginReq.GetMeta().GetHostname(),
|
||||
GoOS: loginReq.GetMeta().GetGoOS(),
|
||||
Kernel: loginReq.GetMeta().GetKernel(),
|
||||
Core: loginReq.GetMeta().GetCore(),
|
||||
Platform: loginReq.GetMeta().GetPlatform(),
|
||||
OS: loginReq.GetMeta().GetOS(),
|
||||
WtVersion: loginReq.GetMeta().GetWiretrusteeVersion(),
|
||||
UIVersion: loginReq.GetMeta().GetUiVersion(),
|
||||
},
|
||||
)
|
||||
userID := ""
|
||||
// JWT token is not always provided, it is fine for userID to be empty cuz it might be that peer is already registered,
|
||||
// or it uses a setup key to register.
|
||||
if loginReq.GetJwtToken() != "" {
|
||||
userID, err = s.validateToken(loginReq.GetJwtToken())
|
||||
if err != nil {
|
||||
log.Errorf("failed updating peer system meta data %s", peerKey.String())
|
||||
return nil, status.Error(codes.Internal, "internal server error")
|
||||
log.Warnf("failed validating JWT token sent from peer %s", peerKey)
|
||||
return nil, mapError(err)
|
||||
}
|
||||
}
|
||||
|
||||
// check if peer login has expired
|
||||
account, err := s.accountManager.GetAccountByPeerID(peer.ID)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, "internal server error")
|
||||
}
|
||||
|
||||
expired, left := peer.LoginExpired(account.Settings.PeerLoginExpiration)
|
||||
expired = account.Settings.PeerLoginExpirationEnabled && expired
|
||||
if peer.UserID != "" && (expired || peer.Status.LoginExpired) {
|
||||
// it might be that peer expired but user has logged in already, check token then
|
||||
if loginReq.GetJwtToken() == "" {
|
||||
err = s.accountManager.MarkPeerLoginExpired(peerKey.String(), true)
|
||||
if err != nil {
|
||||
log.Warnf("failed marking peer login expired %s %v", peerKey, err)
|
||||
}
|
||||
return nil, status.Errorf(codes.PermissionDenied,
|
||||
"peer login has expired %v ago. Please log in once more", left)
|
||||
}
|
||||
_, err = s.validateToken(loginReq.GetJwtToken())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = s.accountManager.UpdatePeerLastLogin(peer.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var sshKey []byte
|
||||
if loginReq.GetPeerKeys() != nil {
|
||||
sshKey = loginReq.GetPeerKeys().GetSshPubKey()
|
||||
}
|
||||
|
||||
if len(sshKey) > 0 {
|
||||
err = s.accountManager.UpdatePeerSSHKey(peer.ID, string(sshKey))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
network, err := s.accountManager.GetPeerNetwork(peer.ID)
|
||||
peer, netMap, err := s.accountManager.LoginPeer(PeerLogin{
|
||||
WireGuardPubKey: peerKey.String(),
|
||||
SSHKey: string(sshKey),
|
||||
Meta: extractPeerMeta(loginReq),
|
||||
UserID: userID,
|
||||
SetupKey: loginReq.GetSetupKey(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed getting peer network on login")
|
||||
log.Warnf("failed logging in peer %s", peerKey)
|
||||
return nil, mapError(err)
|
||||
}
|
||||
|
||||
// if peer has reached this point then it has logged in
|
||||
loginResp := &proto.LoginResponse{
|
||||
WiretrusteeConfig: toWiretrusteeConfig(s.config, nil),
|
||||
PeerConfig: toPeerConfig(peer, network, s.accountManager.GetDNSDomain()),
|
||||
PeerConfig: toPeerConfig(peer, netMap.Network, s.accountManager.GetDNSDomain()),
|
||||
}
|
||||
encryptedResp, err := encryption.EncryptMessage(peerKey, s.wgKey, loginResp)
|
||||
if err != nil {
|
||||
log.Warnf("failed encrypting peer %s message", peer.ID)
|
||||
return nil, status.Errorf(codes.Internal, "failed logging in peer")
|
||||
}
|
||||
|
||||
@@ -555,13 +440,7 @@ func (s *GRPCServer) IsHealthy(ctx context.Context, req *proto.Empty) (*proto.Em
|
||||
}
|
||||
|
||||
// sendInitialSync sends initial proto.SyncResponse to the peer requesting synchronization
|
||||
func (s *GRPCServer) sendInitialSync(peerKey wgtypes.Key, peer *Peer, srv proto.ManagementService_SyncServer) error {
|
||||
networkMap, err := s.accountManager.GetNetworkMap(peer.ID)
|
||||
if err != nil {
|
||||
log.Warnf("error getting a list of peers for a peer %s", peer.ID)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *GRPCServer) sendInitialSync(peerKey wgtypes.Key, peer *Peer, networkMap *NetworkMap, srv proto.ManagementService_SyncServer) error {
|
||||
// make secret time based TURN credentials optional
|
||||
var turnCredentials *TURNCredentials
|
||||
if s.config.TURNConfig.TimeBasedCredentials {
|
||||
|
Reference in New Issue
Block a user