Add client status collection (#368)

This commit is contained in:
Maycon Santos 2022-07-02 12:02:17 +02:00 committed by GitHub
parent e95f0f7acb
commit 8c953c5a2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 524 additions and 53 deletions

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/netbirdio/netbird/client/internal" "github.com/netbirdio/netbird/client/internal"
"github.com/netbirdio/netbird/client/proto" "github.com/netbirdio/netbird/client/proto"
nbStatus "github.com/netbirdio/netbird/client/status"
"github.com/netbirdio/netbird/util" "github.com/netbirdio/netbird/util"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -47,7 +48,7 @@ var upCmd = &cobra.Command{
var cancel context.CancelFunc var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx) ctx, cancel = context.WithCancel(ctx)
SetupCloseHandler(ctx, cancel) SetupCloseHandler(ctx, cancel)
return internal.RunClient(ctx, config) return internal.RunClient(ctx, config, nbStatus.NewRecorder())
} }
conn, err := DialClientGRPCServer(ctx, daemonAddr) conn, err := DialClientGRPCServer(ctx, daemonAddr)

View File

@ -2,7 +2,10 @@ package internal
import ( import (
"context" "context"
"fmt"
"github.com/netbirdio/netbird/client/ssh" "github.com/netbirdio/netbird/client/ssh"
nbStatus "github.com/netbirdio/netbird/client/status"
"strings"
"time" "time"
"github.com/netbirdio/netbird/client/system" "github.com/netbirdio/netbird/client/system"
@ -16,11 +19,11 @@ import (
"github.com/cenkalti/backoff/v4" "github.com/cenkalti/backoff/v4"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" gstatus "google.golang.org/grpc/status"
) )
// RunClient with main logic. // RunClient with main logic.
func RunClient(ctx context.Context, config *Config) error { func RunClient(ctx context.Context, config *Config, statusRecorder *nbStatus.Status) error {
backOff := &backoff.ExponentialBackOff{ backOff := &backoff.ExponentialBackOff{
InitialInterval: time.Second, InitialInterval: time.Second,
RandomizationFactor: backoff.DefaultRandomizationFactor, RandomizationFactor: backoff.DefaultRandomizationFactor,
@ -40,6 +43,26 @@ func RunClient(ctx context.Context, config *Config) error {
}() }()
wrapErr := state.Wrap wrapErr := state.Wrap
// validate our peer's Wireguard PRIVATE key
myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey)
if err != nil {
log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error())
return wrapErr(err)
}
var mgmTlsEnabled bool
if config.ManagementURL.Scheme == "https" {
mgmTlsEnabled = true
}
publicSSHKey, err := ssh.GeneratePublicKey([]byte(config.SSHKey))
if err != nil {
return err
}
managementURL := config.ManagementURL.String()
statusRecorder.MarkManagementDisconnected(managementURL)
operation := func() error { operation := func() error {
// if context cancelled we not start new backoff cycle // if context cancelled we not start new backoff cycle
select { select {
@ -49,37 +72,42 @@ func RunClient(ctx context.Context, config *Config) error {
} }
state.Set(StatusConnecting) state.Set(StatusConnecting)
// validate our peer's Wireguard PRIVATE key
myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey)
if err != nil {
log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error())
return wrapErr(err)
}
var mgmTlsEnabled bool
if config.ManagementURL.Scheme == "https" {
mgmTlsEnabled = true
}
engineCtx, cancel := context.WithCancel(ctx) engineCtx, cancel := context.WithCancel(ctx)
defer cancel() defer func() {
statusRecorder.MarkManagementDisconnected(managementURL)
cancel()
}()
publicSSHKey, err := ssh.GeneratePublicKey([]byte(config.SSHKey))
if err != nil {
return err
}
// connect (just a connection, no stream yet) and login to Management Service to get an initial global Wiretrustee config // connect (just a connection, no stream yet) and login to Management Service to get an initial global Wiretrustee config
mgmClient, loginResp, err := connectToManagement(engineCtx, config.ManagementURL.Host, myPrivateKey, mgmTlsEnabled, mgmClient, loginResp, err := connectToManagement(engineCtx, config.ManagementURL.Host, myPrivateKey, mgmTlsEnabled,
publicSSHKey) publicSSHKey)
if err != nil { if err != nil {
log.Debug(err) log.Debug(err)
if s, ok := status.FromError(err); ok && s.Code() == codes.PermissionDenied { if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied {
log.Info("peer registration required. Please run `netbird status` for details") log.Info("peer registration required. Please run `netbird status` for details")
state.Set(StatusNeedsLogin) state.Set(StatusNeedsLogin)
return nil return nil
} }
return wrapErr(err) return wrapErr(err)
} }
statusRecorder.MarkManagementConnected(managementURL)
localPeerState := nbStatus.LocalPeerState{
IP: loginResp.GetPeerConfig().GetAddress(),
PubKey: myPrivateKey.PublicKey().String(),
KernelInterface: iface.WireguardModExists(),
}
statusRecorder.UpdateLocalPeerState(localPeerState)
signalURL := fmt.Sprintf("%s://%s",
strings.ToLower(loginResp.GetWiretrusteeConfig().GetSignal().GetProtocol().String()),
loginResp.GetWiretrusteeConfig().GetSignal().GetUri(),
)
statusRecorder.MarkSignalDisconnected(signalURL)
defer statusRecorder.MarkSignalDisconnected(signalURL)
// with the global Wiretrustee config in hand connect (just a connection, no stream yet) Signal // with the global Wiretrustee config in hand connect (just a connection, no stream yet) Signal
signalClient, err := connectToSignal(engineCtx, loginResp.GetWiretrusteeConfig(), myPrivateKey) signalClient, err := connectToSignal(engineCtx, loginResp.GetWiretrusteeConfig(), myPrivateKey)
@ -88,6 +116,8 @@ func RunClient(ctx context.Context, config *Config) error {
return wrapErr(err) return wrapErr(err)
} }
statusRecorder.MarkSignalConnected(signalURL)
peerConfig := loginResp.GetPeerConfig() peerConfig := loginResp.GetPeerConfig()
engineConfig, err := createEngineConfig(myPrivateKey, config, peerConfig) engineConfig, err := createEngineConfig(myPrivateKey, config, peerConfig)
@ -96,7 +126,7 @@ func RunClient(ctx context.Context, config *Config) error {
return wrapErr(err) return wrapErr(err)
} }
engine := NewEngine(engineCtx, cancel, signalClient, mgmClient, engineConfig) engine := NewEngine(engineCtx, cancel, signalClient, mgmClient, engineConfig, statusRecorder)
err = engine.Start() err = engine.Start()
if err != nil { if err != nil {
log.Errorf("error while starting Netbird Connection Engine: %s", err) log.Errorf("error while starting Netbird Connection Engine: %s", err)
@ -115,6 +145,7 @@ func RunClient(ctx context.Context, config *Config) error {
log.Errorf("failed closing Management Service client %v", err) log.Errorf("failed closing Management Service client %v", err)
return wrapErr(err) return wrapErr(err)
} }
err = signalClient.Close() err = signalClient.Close()
if err != nil { if err != nil {
log.Errorf("failed closing Signal Service client %v", err) log.Errorf("failed closing Signal Service client %v", err)
@ -136,7 +167,7 @@ func RunClient(ctx context.Context, config *Config) error {
return nil return nil
} }
err := backoff.Retry(operation, backOff) err = backoff.Retry(operation, backOff)
if err != nil { if err != nil {
log.Errorf("exiting client retry loop due to unrecoverable error: %s", err) log.Errorf("exiting client retry loop due to unrecoverable error: %s", err)
return err return err
@ -179,7 +210,7 @@ func connectToSignal(ctx context.Context, wtConfig *mgmProto.WiretrusteeConfig,
signalClient, err := signal.NewClient(ctx, wtConfig.Signal.Uri, ourPrivateKey, sigTLSEnabled) signalClient, err := signal.NewClient(ctx, wtConfig.Signal.Uri, ourPrivateKey, sigTLSEnabled)
if err != nil { if err != nil {
log.Errorf("error while connecting to the Signal Exchange Service %s: %s", wtConfig.Signal.Uri, err) log.Errorf("error while connecting to the Signal Exchange Service %s: %s", wtConfig.Signal.Uri, err)
return nil, status.Errorf(codes.FailedPrecondition, "failed connecting to Signal Service : %s", err) return nil, gstatus.Errorf(codes.FailedPrecondition, "failed connecting to Signal Service : %s", err)
} }
return signalClient, nil return signalClient, nil
@ -190,13 +221,13 @@ func connectToManagement(ctx context.Context, managementAddr string, ourPrivateK
log.Debugf("connecting to Management Service %s", managementAddr) log.Debugf("connecting to Management Service %s", managementAddr)
client, err := mgm.NewClient(ctx, managementAddr, ourPrivateKey, tlsEnabled) client, err := mgm.NewClient(ctx, managementAddr, ourPrivateKey, tlsEnabled)
if err != nil { if err != nil {
return nil, nil, status.Errorf(codes.FailedPrecondition, "failed connecting to Management Service : %s", err) return nil, nil, gstatus.Errorf(codes.FailedPrecondition, "failed connecting to Management Service : %s", err)
} }
log.Debugf("connected to management server %s", managementAddr) log.Debugf("connected to management server %s", managementAddr)
serverPublicKey, err := client.GetServerPublicKey() serverPublicKey, err := client.GetServerPublicKey()
if err != nil { if err != nil {
return nil, nil, status.Errorf(codes.FailedPrecondition, "failed while getting Management Service public key: %s", err) return nil, nil, gstatus.Errorf(codes.FailedPrecondition, "failed while getting Management Service public key: %s", err)
} }
sysInfo := system.GetInfo(ctx) sysInfo := system.GetInfo(ctx)

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
nbssh "github.com/netbirdio/netbird/client/ssh" nbssh "github.com/netbirdio/netbird/client/ssh"
nbstatus "github.com/netbirdio/netbird/client/status"
"math/rand" "math/rand"
"net" "net"
"runtime" "runtime"
@ -95,6 +96,8 @@ type Engine struct {
sshServerFunc func(hostKeyPEM []byte, addr string) (nbssh.Server, error) sshServerFunc func(hostKeyPEM []byte, addr string) (nbssh.Server, error)
sshServer nbssh.Server sshServer nbssh.Server
statusRecorder *nbstatus.Status
} }
// Peer is an instance of the Connection Peer // Peer is an instance of the Connection Peer
@ -106,20 +109,22 @@ type Peer struct {
// NewEngine creates a new Connection Engine // NewEngine creates a new Connection Engine
func NewEngine( func NewEngine(
ctx context.Context, cancel context.CancelFunc, ctx context.Context, cancel context.CancelFunc,
signalClient signal.Client, mgmClient mgm.Client, config *EngineConfig, signalClient signal.Client, mgmClient mgm.Client,
config *EngineConfig, statusRecorder *nbstatus.Status,
) *Engine { ) *Engine {
return &Engine{ return &Engine{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
signal: signalClient, signal: signalClient,
mgmClient: mgmClient, mgmClient: mgmClient,
peerConns: map[string]*peer.Conn{}, peerConns: map[string]*peer.Conn{},
syncMsgMux: &sync.Mutex{}, syncMsgMux: &sync.Mutex{},
config: config, config: config,
STUNs: []*ice.URL{}, STUNs: []*ice.URL{},
TURNs: []*ice.URL{}, TURNs: []*ice.URL{},
networkSerial: 0, networkSerial: 0,
sshServerFunc: nbssh.DefaultSSHServer, sshServerFunc: nbssh.DefaultSSHServer,
statusRecorder: statusRecorder,
} }
} }
@ -302,8 +307,13 @@ func (e *Engine) removePeer(peerKey string) error {
conn, exists := e.peerConns[peerKey] conn, exists := e.peerConns[peerKey]
if exists { if exists {
err := e.statusRecorder.RemovePeer(peerKey)
if err != nil {
log.Warn("received error when removing peer from status recorder: ", err)
}
delete(e.peerConns, peerKey) delete(e.peerConns, peerKey)
err := conn.Close() err = conn.Close()
if err != nil { if err != nil {
switch err.(type) { switch err.(type) {
case *peer.ConnectionAlreadyClosedError: case *peer.ConnectionAlreadyClosedError:
@ -623,6 +633,11 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error {
} }
e.peerConns[peerKey] = conn e.peerConns[peerKey] = conn
err = e.statusRecorder.AddPeer(peerKey)
if err != nil {
log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err)
}
go e.connWorker(conn, peerKey) go e.connWorker(conn, peerKey)
} }
return nil return nil
@ -693,7 +708,7 @@ func (e Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, er
ProxyConfig: proxyConfig, ProxyConfig: proxyConfig,
} }
peerConn, err := peer.NewConn(config) peerConn, err := peer.NewConn(config, e.statusRecorder)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/netbirdio/netbird/client/ssh" "github.com/netbirdio/netbird/client/ssh"
nbstatus "github.com/netbirdio/netbird/client/status"
"github.com/netbirdio/netbird/iface" "github.com/netbirdio/netbird/iface"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"net" "net"
@ -63,7 +64,7 @@ func TestEngine_SSH(t *testing.T) {
WgAddr: "100.64.0.1/24", WgAddr: "100.64.0.1/24",
WgPrivateKey: key, WgPrivateKey: key,
WgPort: 33100, WgPort: 33100,
}) }, nbstatus.NewRecorder())
var sshKeysAdded []string var sshKeysAdded []string
var sshPeersRemoved []string var sshPeersRemoved []string
@ -193,7 +194,7 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
WgAddr: "100.64.0.1/24", WgAddr: "100.64.0.1/24",
WgPrivateKey: key, WgPrivateKey: key,
WgPort: 33100, WgPort: 33100,
}) }, nbstatus.NewRecorder())
engine.wgInterface, err = iface.NewWGIFace("utun102", "100.64.0.1/24", iface.DefaultMTU) engine.wgInterface, err = iface.NewWGIFace("utun102", "100.64.0.1/24", iface.DefaultMTU)
type testCase struct { type testCase struct {
@ -373,7 +374,7 @@ func TestEngine_Sync(t *testing.T) {
WgAddr: "100.64.0.1/24", WgAddr: "100.64.0.1/24",
WgPrivateKey: key, WgPrivateKey: key,
WgPort: 33100, WgPort: 33100,
}) }, nbstatus.NewRecorder())
defer func() { defer func() {
err := engine.Stop() err := engine.Stop()
@ -576,7 +577,7 @@ func createEngine(ctx context.Context, cancel context.CancelFunc, setupKey strin
WgPort: wgPort, WgPort: wgPort,
} }
return NewEngine(ctx, cancel, signalClient, mgmtClient, conf), nil return NewEngine(ctx, cancel, signalClient, mgmtClient, conf, nbstatus.NewRecorder()), nil
} }
func startSignal(port int) (*grpc.Server, error) { func startSignal(port int) (*grpc.Server, error) {

View File

@ -2,6 +2,7 @@ package peer
import ( import (
"context" "context"
nbStatus "github.com/netbirdio/netbird/client/status"
"github.com/netbirdio/netbird/iface" "github.com/netbirdio/netbird/iface"
"golang.zx2c4.com/wireguard/wgctrl" "golang.zx2c4.com/wireguard/wgctrl"
"net" "net"
@ -64,6 +65,8 @@ type Conn struct {
agent *ice.Agent agent *ice.Agent
status ConnStatus status ConnStatus
statusRecorder *nbStatus.Status
proxy proxy.Proxy proxy proxy.Proxy
} }
@ -74,7 +77,7 @@ func (conn *Conn) GetConf() ConnConfig {
// NewConn creates a new not opened Conn to the remote peer. // NewConn creates a new not opened Conn to the remote peer.
// To establish a connection run Conn.Open // To establish a connection run Conn.Open
func NewConn(config ConnConfig) (*Conn, error) { func NewConn(config ConnConfig, statusRecorder *nbStatus.Status) (*Conn, error) {
return &Conn{ return &Conn{
config: config, config: config,
mu: sync.Mutex{}, mu: sync.Mutex{},
@ -82,6 +85,7 @@ func NewConn(config ConnConfig) (*Conn, error) {
closeCh: make(chan struct{}), closeCh: make(chan struct{}),
remoteOffersCh: make(chan IceCredentials), remoteOffersCh: make(chan IceCredentials),
remoteAnswerCh: make(chan IceCredentials), remoteAnswerCh: make(chan IceCredentials),
statusRecorder: statusRecorder,
}, nil }, nil
} }
@ -157,6 +161,17 @@ func (conn *Conn) reCreateAgent() error {
func (conn *Conn) Open() error { func (conn *Conn) Open() error {
log.Debugf("trying to connect to peer %s", conn.config.Key) log.Debugf("trying to connect to peer %s", conn.config.Key)
peerState := nbStatus.PeerState{PubKey: conn.config.Key}
peerState.IP = strings.Split(conn.config.ProxyConfig.AllowedIps, "/")[0]
peerState.ConnStatusUpdate = time.Now()
peerState.ConnStatus = conn.status.String()
err := conn.statusRecorder.UpdatePeerState(peerState)
if err != nil {
log.Warnf("erro while updating the state of peer %s,err: %v", conn.config.Key, err)
}
defer func() { defer func() {
err := conn.cleanup() err := conn.cleanup()
if err != nil { if err != nil {
@ -165,7 +180,7 @@ func (conn *Conn) Open() error {
} }
}() }()
err := conn.reCreateAgent() err = conn.reCreateAgent()
if err != nil { if err != nil {
return err return err
} }
@ -205,6 +220,15 @@ func (conn *Conn) Open() error {
defer conn.notifyDisconnected() defer conn.notifyDisconnected()
conn.mu.Unlock() conn.mu.Unlock()
peerState = nbStatus.PeerState{PubKey: conn.config.Key}
peerState.ConnStatus = conn.status.String()
peerState.ConnStatusUpdate = time.Now()
err = conn.statusRecorder.UpdatePeerState(peerState)
if err != nil {
log.Warnf("erro while updating the state of peer %s,err: %v", conn.config.Key, err)
}
err = conn.agent.GatherCandidates() err = conn.agent.GatherCandidates()
if err != nil { if err != nil {
return err return err
@ -224,7 +248,7 @@ func (conn *Conn) Open() error {
return err return err
} }
// the connection has been established successfully so we are ready to start the proxy // the ice connection has been established successfully so we are ready to start the proxy
err = conn.startProxy(remoteConn) err = conn.startProxy(remoteConn)
if err != nil { if err != nil {
return err return err
@ -296,12 +320,15 @@ func (conn *Conn) startProxy(remoteConn net.Conn) error {
return err return err
} }
peerState := nbStatus.PeerState{PubKey: conn.config.Key}
useProxy := shouldUseProxy(pair) useProxy := shouldUseProxy(pair)
var p proxy.Proxy var p proxy.Proxy
if useProxy { if useProxy {
p = proxy.NewWireguardProxy(conn.config.ProxyConfig) p = proxy.NewWireguardProxy(conn.config.ProxyConfig)
peerState.Direct = false
} else { } else {
p = proxy.NewNoProxy(conn.config.ProxyConfig) p = proxy.NewNoProxy(conn.config.ProxyConfig)
peerState.Direct = true
} }
conn.proxy = p conn.proxy = p
err = p.Start(remoteConn) err = p.Start(remoteConn)
@ -311,6 +338,19 @@ func (conn *Conn) startProxy(remoteConn net.Conn) error {
conn.status = StatusConnected conn.status = StatusConnected
peerState.ConnStatus = conn.status.String()
peerState.ConnStatusUpdate = time.Now()
peerState.LocalIceCandidateType = pair.Local.Type().String()
peerState.RemoteIceCandidateType = pair.Remote.Type().String()
if pair.Local.Type() == ice.CandidateTypeRelay || pair.Remote.Type() == ice.CandidateTypeRelay {
peerState.Relayed = true
}
err = conn.statusRecorder.UpdatePeerState(peerState)
if err != nil {
log.Warnf("unable to save peer's state, got error: %v", err)
}
return nil return nil
} }
@ -343,6 +383,14 @@ func (conn *Conn) cleanup() error {
conn.status = StatusDisconnected conn.status = StatusDisconnected
peerState := nbStatus.PeerState{PubKey: conn.config.Key}
peerState.ConnStatus = conn.status.String()
peerState.ConnStatusUpdate = time.Now()
err := conn.statusRecorder.UpdatePeerState(peerState)
if err != nil {
log.Warnf("error while updating peer's %s state, err: %v", conn.config.Key, err)
}
log.Debugf("cleaned up connection to peer %s", conn.config.Key) log.Debugf("cleaned up connection to peer %s", conn.config.Key)
return nil return nil

View File

@ -3,6 +3,7 @@ package peer
import ( import (
"github.com/magiconair/properties/assert" "github.com/magiconair/properties/assert"
"github.com/netbirdio/netbird/client/internal/proxy" "github.com/netbirdio/netbird/client/internal/proxy"
nbstatus "github.com/netbirdio/netbird/client/status"
"github.com/netbirdio/netbird/iface" "github.com/netbirdio/netbird/iface"
"github.com/pion/ice/v2" "github.com/pion/ice/v2"
"sync" "sync"
@ -32,7 +33,7 @@ func TestNewConn_interfaceFilter(t *testing.T) {
} }
func TestConn_GetKey(t *testing.T) { func TestConn_GetKey(t *testing.T) {
conn, err := NewConn(connConf) conn, err := NewConn(connConf, nil)
if err != nil { if err != nil {
return return
} }
@ -44,7 +45,7 @@ func TestConn_GetKey(t *testing.T) {
func TestConn_OnRemoteOffer(t *testing.T) { func TestConn_OnRemoteOffer(t *testing.T) {
conn, err := NewConn(connConf) conn, err := NewConn(connConf, nbstatus.NewRecorder())
if err != nil { if err != nil {
return return
} }
@ -74,7 +75,7 @@ func TestConn_OnRemoteOffer(t *testing.T) {
func TestConn_OnRemoteAnswer(t *testing.T) { func TestConn_OnRemoteAnswer(t *testing.T) {
conn, err := NewConn(connConf) conn, err := NewConn(connConf, nbstatus.NewRecorder())
if err != nil { if err != nil {
return return
} }
@ -103,7 +104,7 @@ func TestConn_OnRemoteAnswer(t *testing.T) {
} }
func TestConn_Status(t *testing.T) { func TestConn_Status(t *testing.T) {
conn, err := NewConn(connConf) conn, err := NewConn(connConf, nbstatus.NewRecorder())
if err != nil { if err != nil {
return return
} }
@ -130,7 +131,7 @@ func TestConn_Status(t *testing.T) {
func TestConn_Close(t *testing.T) { func TestConn_Close(t *testing.T) {
conn, err := NewConn(connConf) conn, err := NewConn(connConf, nbstatus.NewRecorder())
if err != nil { if err != nil {
return return
} }

View File

@ -3,6 +3,7 @@ package server
import ( import (
"context" "context"
"fmt" "fmt"
nbStatus "github.com/netbirdio/netbird/client/status"
"sync" "sync"
"time" "time"
@ -31,6 +32,8 @@ type Server struct {
mutex sync.Mutex mutex sync.Mutex
config *internal.Config config *internal.Config
proto.UnimplementedDaemonServiceServer proto.UnimplementedDaemonServiceServer
statusRecorder *nbStatus.Status
} }
type oauthAuthFlow struct { type oauthAuthFlow struct {
@ -52,6 +55,8 @@ func New(ctx context.Context, managementURL, adminURL, configPath, logFile strin
} }
func (s *Server) Start() error { func (s *Server) Start() error {
s.mutex.Lock()
defer s.mutex.Unlock()
state := internal.CtxGetState(s.rootCtx) state := internal.CtxGetState(s.rootCtx)
// if current state contains any error, return it // if current state contains any error, return it
@ -89,8 +94,10 @@ func (s *Server) Start() error {
s.config = config s.config = config
s.statusRecorder = nbStatus.NewRecorder()
go func() { go func() {
if err := internal.RunClient(ctx, config); err != nil { if err := internal.RunClient(ctx, config, s.statusRecorder); err != nil {
log.Errorf("init connections: %v", err) log.Errorf("init connections: %v", err)
} }
}() }()
@ -350,8 +357,12 @@ func (s *Server) Up(callerCtx context.Context, msg *proto.UpRequest) (*proto.UpR
return nil, fmt.Errorf("config is not defined, please call login command first") return nil, fmt.Errorf("config is not defined, please call login command first")
} }
if s.statusRecorder == nil {
s.statusRecorder = nbStatus.NewRecorder()
}
go func() { go func() {
if err := internal.RunClient(ctx, s.config); err != nil { if err := internal.RunClient(ctx, s.config, s.statusRecorder); err != nil {
log.Errorf("run client connection: %v", state.Wrap(err)) log.Errorf("run client connection: %v", state.Wrap(err))
return return
} }

183
client/status/status.go Normal file
View File

@ -0,0 +1,183 @@
package status
import (
"errors"
"sync"
"time"
)
// PeerState contains the latest state of a peer
type PeerState struct {
IP string
PubKey string
ConnStatus string
ConnStatusUpdate time.Time
Relayed bool
Direct bool
LocalIceCandidateType string
RemoteIceCandidateType string
}
// LocalPeerState contains the latest state of the local peer
type LocalPeerState struct {
IP string
PubKey string
KernelInterface bool
}
// SignalState contains the latest state of a signal connection
type SignalState struct {
URL string
Connected bool
}
// ManagementState contains the latest state of a management connection
type ManagementState struct {
URL string
Connected bool
}
// FullStatus contains the full state held by the Status instance
type FullStatus struct {
Peers []PeerState
ManagementState ManagementState
SignalState SignalState
LocalPeerState LocalPeerState
}
// Status holds a state of peers, signal and management connections
type Status struct {
mux sync.Mutex
peers map[string]PeerState
signal SignalState
management ManagementState
localPeer LocalPeerState
}
// NewRecorder returns a new Status instance
func NewRecorder() *Status {
return &Status{
peers: make(map[string]PeerState),
}
}
// AddPeer adds peer to Daemon status map
func (d *Status) AddPeer(peerPubKey string) error {
d.mux.Lock()
defer d.mux.Unlock()
_, ok := d.peers[peerPubKey]
if ok {
return errors.New("peer already exist")
}
d.peers[peerPubKey] = PeerState{PubKey: peerPubKey}
return nil
}
// RemovePeer removes peer from Daemon status map
func (d *Status) RemovePeer(peerPubKey string) error {
d.mux.Lock()
defer d.mux.Unlock()
_, ok := d.peers[peerPubKey]
if ok {
delete(d.peers, peerPubKey)
return nil
}
return errors.New("no peer with to remove")
}
// UpdatePeerState updates peer status
func (d *Status) UpdatePeerState(receivedState PeerState) error {
d.mux.Lock()
defer d.mux.Unlock()
peerState, ok := d.peers[receivedState.PubKey]
if !ok {
return errors.New("peer doesn't exist")
}
if receivedState.IP != "" {
peerState.IP = receivedState.IP
}
if receivedState.ConnStatus != peerState.ConnStatus {
peerState.ConnStatus = receivedState.ConnStatus
peerState.ConnStatusUpdate = receivedState.ConnStatusUpdate
peerState.Direct = receivedState.Direct
peerState.Relayed = receivedState.Relayed
peerState.LocalIceCandidateType = receivedState.LocalIceCandidateType
peerState.RemoteIceCandidateType = receivedState.RemoteIceCandidateType
}
d.peers[receivedState.PubKey] = peerState
return nil
}
// UpdateLocalPeerState updates local peer status
func (d *Status) UpdateLocalPeerState(localPeerState LocalPeerState) {
d.mux.Lock()
defer d.mux.Unlock()
d.localPeer = localPeerState
}
// MarkManagementDisconnected sets ManagementState to disconnected
func (d *Status) MarkManagementDisconnected(managementURL string) {
d.mux.Lock()
defer d.mux.Unlock()
d.management = ManagementState{
URL: managementURL,
Connected: false,
}
}
// MarkManagementConnected sets ManagementState to connected
func (d *Status) MarkManagementConnected(managementURL string) {
d.mux.Lock()
defer d.mux.Unlock()
d.management = ManagementState{
URL: managementURL,
Connected: true,
}
}
// MarkSignalDisconnected sets SignalState to disconnected
func (d *Status) MarkSignalDisconnected(signalURL string) {
d.mux.Lock()
defer d.mux.Unlock()
d.signal = SignalState{
signalURL,
false,
}
}
// MarkSignalConnected sets SignalState to connected
func (d *Status) MarkSignalConnected(signalURL string) {
d.mux.Lock()
defer d.mux.Unlock()
d.signal = SignalState{
signalURL,
true,
}
}
// GetFullStatus gets full status
func (d *Status) GetFullStatus() FullStatus {
d.mux.Lock()
defer d.mux.Unlock()
fullStatus := FullStatus{
ManagementState: d.management,
SignalState: d.signal,
LocalPeerState: d.localPeer,
}
for _, status := range d.peers {
fullStatus.Peers = append(fullStatus.Peers, status)
}
return fullStatus
}

View File

@ -0,0 +1,169 @@
package status
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestAddPeer(t *testing.T) {
key := "abc"
status := NewRecorder()
err := status.AddPeer(key)
assert.NoError(t, err, "shouldn't return error")
_, exists := status.peers[key]
assert.True(t, exists, "value was found")
err = status.AddPeer(key)
assert.Error(t, err, "should return error on duplicate")
}
func TestUpdatePeerState(t *testing.T) {
key := "abc"
ip := "10.10.10.10"
status := NewRecorder()
peerState := PeerState{
PubKey: key,
}
status.peers[key] = peerState
peerState.IP = ip
err := status.UpdatePeerState(peerState)
assert.NoError(t, err, "shouldn't return error")
state, exists := status.peers[key]
assert.True(t, exists, "state should be found")
assert.Equal(t, ip, state.IP, "ip should be equal")
}
func TestRemovePeer(t *testing.T) {
key := "abc"
status := NewRecorder()
peerState := PeerState{
PubKey: key,
}
status.peers[key] = peerState
err := status.RemovePeer(key)
assert.NoError(t, err, "shouldn't return error")
_, exists := status.peers[key]
assert.False(t, exists, "state value shouldn't be found")
err = status.RemovePeer("not existing")
assert.Error(t, err, "should return error when peer doesn't exist")
}
func TestUpdateLocalPeerState(t *testing.T) {
localPeerState := LocalPeerState{
IP: "10.10.10.10",
PubKey: "abc",
KernelInterface: false,
}
status := NewRecorder()
status.UpdateLocalPeerState(localPeerState)
assert.Equal(t, localPeerState, status.localPeer, "local peer status should be equal")
}
func TestUpdateSignalState(t *testing.T) {
url := "https://signal"
var tests = []struct {
name string
connected bool
want SignalState
}{
{"should mark as connected", true, SignalState{
URL: url,
Connected: true,
}},
{"should mark as disconnected", false, SignalState{
URL: url,
Connected: false,
}},
}
status := NewRecorder()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.connected {
status.MarkSignalConnected(url)
} else {
status.MarkSignalDisconnected(url)
}
assert.Equal(t, test.want, status.signal, "signal status should be equal")
})
}
}
func TestUpdateManagementState(t *testing.T) {
url := "https://management"
var tests = []struct {
name string
connected bool
want ManagementState
}{
{"should mark as connected", true, ManagementState{
URL: url,
Connected: true,
}},
{"should mark as disconnected", false, ManagementState{
URL: url,
Connected: false,
}},
}
status := NewRecorder()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.connected {
status.MarkManagementConnected(url)
} else {
status.MarkManagementDisconnected(url)
}
assert.Equal(t, test.want, status.management, "signal status should be equal")
})
}
}
func TestGetFullStatus(t *testing.T) {
key1 := "abc"
key2 := "def"
managementState := ManagementState{
URL: "https://signal",
Connected: true,
}
signalState := SignalState{
URL: "https://signal",
Connected: true,
}
peerState1 := PeerState{
PubKey: key1,
}
peerState2 := PeerState{
PubKey: key2,
}
status := NewRecorder()
status.management = managementState
status.signal = signalState
status.peers[key1] = peerState1
status.peers[key2] = peerState2
fullStatus := status.GetFullStatus()
assert.Equal(t, managementState, fullStatus.ManagementState, "management status should be equal")
assert.Equal(t, signalState, fullStatus.SignalState, "signal status should be equal")
assert.ElementsMatch(t, []PeerState{peerState1, peerState2}, fullStatus.Peers, "peers states should match")
}

View File

@ -33,3 +33,8 @@ func (w *WGIface) assignAddr() error {
return nil return nil
} }
// WireguardModExists check if we can load wireguard mod (linux only)
func WireguardModExists() bool {
return false
}

View File

@ -14,6 +14,7 @@ type NativeLink struct {
Link *netlink.Link Link *netlink.Link
} }
// WireguardModExists check if we can load wireguard mod (linux only)
func WireguardModExists() bool { func WireguardModExists() bool {
link := newWGLink("mustnotexist") link := newWGLink("mustnotexist")

View File

@ -57,3 +57,8 @@ func (w *WGIface) UpdateAddr(newAddr string) error {
w.Address = addr w.Address = addr
return w.assignAddr(luid) return w.assignAddr(luid)
} }
// WireguardModExists check if we can load wireguard mod (linux only)
func WireguardModExists() bool {
return false
}