mirror of
https://github.com/netbirdio/netbird.git
synced 2024-12-24 15:48:52 +01:00
2c2c1e19df
* feature: add config properties to the SyncResponse of the management gRpc service * fix: lint errors * chore: modify management protocol according to the review notes * fix: management proto fields sequence * feature: add proper peer configuration to be synced * chore: minor changes * feature: finalize peer config management * fix: lint errors * feature: add management server config file * refactor: extract hosts-config to a separate file * refactor: review notes applied to correct file_store usage * refactor: extract management service configuration to a file * refactor: simplify management config
296 lines
8.5 KiB
Go
296 lines
8.5 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/ptypes/timestamp"
|
|
log "github.com/sirupsen/logrus"
|
|
"github.com/wiretrustee/wiretrustee/encryption"
|
|
"github.com/wiretrustee/wiretrustee/management/proto"
|
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
// Server an instance of a Management server
|
|
type Server struct {
|
|
accountManager *AccountManager
|
|
wgKey wgtypes.Key
|
|
proto.UnimplementedManagementServiceServer
|
|
peerChannels map[string]chan *UpdateChannelMessage
|
|
channelsMux *sync.Mutex
|
|
config *Config
|
|
}
|
|
|
|
// AllowedIPsFormat generates Wireguard AllowedIPs format (e.g. 100.30.30.1/32)
|
|
const AllowedIPsFormat = "%s/32"
|
|
|
|
type UpdateChannelMessage struct {
|
|
Update *proto.SyncResponse
|
|
}
|
|
|
|
// NewServer creates a new Management server
|
|
func NewServer(config *Config) (*Server, error) {
|
|
key, err := wgtypes.GeneratePrivateKey()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
store, err := NewStore(config.Datadir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &Server{
|
|
wgKey: key,
|
|
// peerKey -> event channel
|
|
peerChannels: make(map[string]chan *UpdateChannelMessage),
|
|
channelsMux: &sync.Mutex{},
|
|
accountManager: NewManager(store),
|
|
config: config,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Server) GetServerKey(ctx context.Context, req *proto.Empty) (*proto.ServerKeyResponse, error) {
|
|
|
|
// todo introduce something more meaningful with the key expiration/rotation
|
|
now := time.Now().Add(24 * time.Hour)
|
|
secs := int64(now.Second())
|
|
nanos := int32(now.Nanosecond())
|
|
expiresAt := ×tamp.Timestamp{Seconds: secs, Nanos: nanos}
|
|
|
|
return &proto.ServerKeyResponse{
|
|
Key: s.wgKey.PublicKey().String(),
|
|
ExpiresAt: expiresAt,
|
|
}, nil
|
|
}
|
|
|
|
//Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
|
|
// notifies the connected peer of any updates (e.g. new peers under the same account)
|
|
func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
|
|
|
|
log.Debugf("Sync request from peer %s", req.WgPubKey)
|
|
|
|
peerKey, err := wgtypes.ParseKey(req.GetWgPubKey())
|
|
if err != nil {
|
|
log.Warnf("error while parsing peer's Wireguard public key %s on Sync request.", peerKey.String())
|
|
return status.Errorf(codes.InvalidArgument, "provided wgPubKey %s is invalid", peerKey.String())
|
|
}
|
|
|
|
peer, err := s.accountManager.GetPeer(peerKey.String())
|
|
if err != nil {
|
|
return status.Errorf(codes.Unauthenticated, "provided peer with the key wgPubKey %s is not registered", peerKey.String())
|
|
}
|
|
|
|
syncReq := &proto.SyncRequest{}
|
|
err = encryption.DecryptMessage(peerKey, s.wgKey, req.Body, syncReq)
|
|
if err != nil {
|
|
return status.Errorf(codes.InvalidArgument, "invalid request message")
|
|
}
|
|
|
|
err = s.sendInitialSync(peerKey, peer, srv)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
updates := s.openUpdatesChannel(peerKey.String())
|
|
|
|
// keep a connection to the peer and send updates when available
|
|
for {
|
|
select {
|
|
// condition when there are some updates
|
|
case update, open := <-updates:
|
|
if !open {
|
|
// updates channel has been closed
|
|
return nil
|
|
}
|
|
log.Debugf("recevied an update for peer %s", peerKey.String())
|
|
|
|
encryptedResp, err := encryption.EncryptMessage(peerKey, s.wgKey, update.Update)
|
|
if err != nil {
|
|
return status.Errorf(codes.Internal, "failed processing update message")
|
|
}
|
|
|
|
err = srv.SendMsg(&proto.EncryptedMessage{
|
|
WgPubKey: s.wgKey.PublicKey().String(),
|
|
Body: encryptedResp,
|
|
})
|
|
if err != nil {
|
|
return status.Errorf(codes.Internal, "failed sending update message")
|
|
}
|
|
// condition when client <-> server connection has been terminated
|
|
case <-srv.Context().Done():
|
|
// happens when connection drops, e.g. client disconnects
|
|
log.Debugf("stream of peer %s has been closed", peerKey.String())
|
|
s.closeUpdatesChannel(peerKey.String())
|
|
return srv.Context().Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
// RegisterPeer adds a peer to the Store. Returns 404 in case the provided setup key doesn't exist
|
|
func (s *Server) RegisterPeer(ctx context.Context, req *proto.RegisterPeerRequest) (*proto.RegisterPeerResponse, error) {
|
|
|
|
log.Debugf("RegisterPeer request from peer %s", req.Key)
|
|
|
|
s.channelsMux.Lock()
|
|
defer s.channelsMux.Unlock()
|
|
|
|
peer, err := s.accountManager.AddPeer(req.SetupKey, req.Key)
|
|
if err != nil {
|
|
return &proto.RegisterPeerResponse{}, status.Errorf(codes.NotFound, "provided setup key doesn't exists")
|
|
}
|
|
|
|
peers, err := s.accountManager.GetPeersForAPeer(peer.Key)
|
|
if err != nil {
|
|
//todo return a proper error
|
|
return nil, err
|
|
}
|
|
|
|
// notify other peers of our registration
|
|
for _, remotePeer := range peers {
|
|
if channel, ok := s.peerChannels[remotePeer.Key]; ok {
|
|
// exclude notified peer and add ourselves
|
|
peersToSend := []*Peer{peer}
|
|
for _, p := range peers {
|
|
if remotePeer.Key != p.Key {
|
|
peersToSend = append(peersToSend, p)
|
|
}
|
|
}
|
|
update := toSyncResponse(s.config, peer, peersToSend)
|
|
channel <- &UpdateChannelMessage{Update: update}
|
|
}
|
|
}
|
|
|
|
return &proto.RegisterPeerResponse{}, nil
|
|
}
|
|
|
|
func toResponseProto(configProto Protocol) proto.HostConfig_Protocol {
|
|
switch configProto {
|
|
case UDP:
|
|
return proto.HostConfig_UDP
|
|
case DTLS:
|
|
return proto.HostConfig_DTLS
|
|
case HTTP:
|
|
return proto.HostConfig_HTTP
|
|
case HTTPS:
|
|
return proto.HostConfig_HTTPS
|
|
case TCP:
|
|
return proto.HostConfig_TCP
|
|
default:
|
|
//mbragin: todo something better?
|
|
panic(fmt.Errorf("unexpected config protocol type %v", configProto))
|
|
}
|
|
}
|
|
|
|
func toSyncResponse(config *Config, peer *Peer, peers []*Peer) *proto.SyncResponse {
|
|
|
|
var stuns []*proto.HostConfig
|
|
for _, stun := range config.Stuns {
|
|
stuns = append(stuns, &proto.HostConfig{
|
|
Uri: stun.URI,
|
|
Protocol: toResponseProto(stun.Proto),
|
|
})
|
|
}
|
|
var turns []*proto.ProtectedHostConfig
|
|
for _, turn := range config.Turns {
|
|
turns = append(turns, &proto.ProtectedHostConfig{
|
|
HostConfig: &proto.HostConfig{
|
|
Uri: turn.URI,
|
|
Protocol: toResponseProto(turn.Proto),
|
|
},
|
|
User: turn.Username,
|
|
Password: string(turn.Password),
|
|
})
|
|
}
|
|
|
|
wtConfig := &proto.WiretrusteeConfig{
|
|
Stuns: stuns,
|
|
Turns: turns,
|
|
Signal: &proto.HostConfig{
|
|
Uri: config.Signal.URI,
|
|
Protocol: toResponseProto(config.Signal.Proto),
|
|
},
|
|
}
|
|
|
|
pConfig := &proto.PeerConfig{
|
|
Address: peer.IP.String(),
|
|
}
|
|
|
|
remotePeers := make([]*proto.RemotePeerConfig, 0, len(peers))
|
|
for _, rPeer := range peers {
|
|
remotePeers = append(remotePeers, &proto.RemotePeerConfig{
|
|
WgPubKey: rPeer.Key,
|
|
AllowedIps: []string{fmt.Sprintf(AllowedIPsFormat, rPeer.IP)}, //todo /32
|
|
})
|
|
}
|
|
|
|
return &proto.SyncResponse{
|
|
WiretrusteeConfig: wtConfig,
|
|
PeerConfig: pConfig,
|
|
RemotePeers: remotePeers,
|
|
}
|
|
}
|
|
|
|
// IsHealthy indicates whether the service is healthy
|
|
func (s *Server) IsHealthy(ctx context.Context, req *proto.Empty) (*proto.Empty, error) {
|
|
return &proto.Empty{}, nil
|
|
}
|
|
|
|
// openUpdatesChannel creates a go channel for a given peer used to deliver updates relevant to the peer.
|
|
func (s *Server) openUpdatesChannel(peerKey string) chan *UpdateChannelMessage {
|
|
s.channelsMux.Lock()
|
|
defer s.channelsMux.Unlock()
|
|
if channel, ok := s.peerChannels[peerKey]; ok {
|
|
delete(s.peerChannels, peerKey)
|
|
close(channel)
|
|
}
|
|
//mbragin: todo shouldn't it be more? or configurable?
|
|
channel := make(chan *UpdateChannelMessage, 100)
|
|
s.peerChannels[peerKey] = channel
|
|
|
|
log.Debugf("opened updates channel for a peer %s", peerKey)
|
|
return channel
|
|
}
|
|
|
|
// closeUpdatesChannel closes updates channel of a given peer
|
|
func (s *Server) closeUpdatesChannel(peerKey string) {
|
|
s.channelsMux.Lock()
|
|
defer s.channelsMux.Unlock()
|
|
if channel, ok := s.peerChannels[peerKey]; ok {
|
|
delete(s.peerChannels, peerKey)
|
|
close(channel)
|
|
}
|
|
|
|
log.Debugf("closed updates channel of a peer %s", peerKey)
|
|
}
|
|
|
|
// sendInitialSync sends initial proto.SyncResponse to the peer requesting synchronization
|
|
func (s *Server) sendInitialSync(peerKey wgtypes.Key, peer *Peer, srv proto.ManagementService_SyncServer) error {
|
|
|
|
peers, err := s.accountManager.GetPeersForAPeer(peer.Key)
|
|
if err != nil {
|
|
log.Warnf("error getting a list of peers for a peer %s", peer.Key)
|
|
return err
|
|
}
|
|
plainResp := toSyncResponse(s.config, peer, peers)
|
|
|
|
encryptedResp, err := encryption.EncryptMessage(peerKey, s.wgKey, plainResp)
|
|
if err != nil {
|
|
return status.Errorf(codes.Internal, "error handling request")
|
|
}
|
|
|
|
err = srv.Send(&proto.EncryptedMessage{
|
|
WgPubKey: s.wgKey.PublicKey().String(),
|
|
Body: encryptedResp,
|
|
})
|
|
|
|
if err != nil {
|
|
log.Errorf("failed sending SyncResponse %v", err)
|
|
return status.Errorf(codes.Internal, "error handling request")
|
|
}
|
|
|
|
return nil
|
|
}
|