Add peer conn profile code

This commit is contained in:
Zoltán Papp 2024-12-19 13:53:51 +01:00
parent 37ad370344
commit 9ff03141ba
12 changed files with 371 additions and 2 deletions

View File

@ -218,3 +218,31 @@ func (c *KernelConfigurer) GetStats(peerKey string) (WGStats, error) {
RxBytes: peer.ReceiveBytes,
}, nil
}
func (c *KernelConfigurer) GetAllStat() (map[string]WGStats, error) {
wg, err := wgctrl.New()
if err != nil {
return nil, fmt.Errorf("wgctl: %w", err)
}
defer func() {
err = wg.Close()
if err != nil {
log.Errorf("Got error while closing wgctl: %v", err)
}
}()
wgDevice, err := wg.Device(c.deviceName)
if err != nil {
return nil, fmt.Errorf("get device %s: %w", c.deviceName, err)
}
stats := make(map[string]WGStats)
for _, peer := range wgDevice.Peers {
stats[peer.PublicKey.String()] = WGStats{
LastHandshake: peer.LastHandshakeTime,
TxBytes: peer.TransmitBytes,
RxBytes: peer.ReceiveBytes,
}
}
return stats, nil
}

View File

@ -263,6 +263,52 @@ func (t *WGUSPConfigurer) GetStats(peerKey string) (WGStats, error) {
}, nil
}
func (t *WGUSPConfigurer) GetAllStat() (map[string]WGStats, error) {
ipc, err := t.device.IpcGet()
if err != nil {
return nil, fmt.Errorf("ipc get: %w", err)
}
stats, err := parsePeerInfo(ipc, []string{
"last_handshake_time_sec",
"last_handshake_time_nsec",
"tx_bytes",
"rx_bytes",
})
if err != nil {
return nil, fmt.Errorf("find peer info: %w", err)
}
wgStats := make(map[string]WGStats)
for k, v := range stats {
sec, err := strconv.ParseInt(v["last_handshake_time_sec"], 10, 64)
if err != nil {
return nil, fmt.Errorf("parse handshake sec: %w", err)
}
nsec, err := strconv.ParseInt(v["last_handshake_time_nsec"], 10, 64)
if err != nil {
return nil, fmt.Errorf("parse handshake nsec: %w", err)
}
txBytes, err := strconv.ParseInt(v["tx_bytes"], 10, 64)
if err != nil {
return nil, fmt.Errorf("parse tx_bytes: %w", err)
}
rxBytes, err := strconv.ParseInt(v["rx_bytes"], 10, 64)
if err != nil {
return nil, fmt.Errorf("parse rx_bytes: %w", err)
}
wgStats[k] = WGStats{
LastHandshake: time.Unix(sec, nsec),
TxBytes: txBytes,
RxBytes: rxBytes,
}
}
return wgStats, nil
}
func findPeerInfo(ipcInput string, peerKey string, searchConfigKeys []string) (map[string]string, error) {
peerKeyParsed, err := wgtypes.ParseKey(peerKey)
if err != nil {
@ -310,6 +356,44 @@ func findPeerInfo(ipcInput string, peerKey string, searchConfigKeys []string) (m
return configFound, nil
}
func parsePeerInfo(ipcInput string, searchConfigKeys []string) (map[string]map[string]string, error) {
lines := strings.Split(ipcInput, "\n")
allPeers := map[string]map[string]string{}
var currentPeerKey string
for _, line := range lines {
line = strings.TrimSpace(line)
// Detect new peer section by public key
if strings.HasPrefix(line, "public_key=") {
hexKey := strings.TrimPrefix(line, "public_key=")
keyBytes, _ := hex.DecodeString(hexKey)
wgKey, _ := wgtypes.NewKey(keyBytes)
currentPeerKey = wgKey.String()
if _, exists := allPeers[currentPeerKey]; !exists {
allPeers[currentPeerKey] = map[string]string{}
}
continue
}
// Parse configuration keys for the current peer
if currentPeerKey != "" {
for _, key := range searchConfigKeys {
if strings.HasPrefix(line, key+"=") {
v := strings.SplitN(line, "=", 2)
if len(v) == 2 {
allPeers[currentPeerKey][v[0]] = v[1]
}
}
}
}
}
return allPeers, nil
}
func toWgUserspaceString(wgCfg wgtypes.Config) string {
var sb strings.Builder
if wgCfg.PrivateKey != nil {

View File

@ -5,6 +5,7 @@ import (
"fmt"
"testing"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
@ -34,6 +35,19 @@ errno=0
`
func Test_parsePeerInto(t *testing.T) {
r, err := parsePeerInfo(ipcFixture, []string{
"last_handshake_time_sec",
"last_handshake_time_nsec",
"tx_bytes",
"rx_bytes",
})
if err != nil {
t.Errorf("parsePeerInfo() error = %v", err)
}
log.Infof("r: %v", r)
}
func Test_findPeerInfo(t *testing.T) {
tests := []struct {
name string

View File

@ -17,4 +17,5 @@ type WGConfigurer interface {
RemoveAllowedIP(peerKey string, allowedIP string) error
Close()
GetStats(peerKey string) (configurer.WGStats, error)
GetAllStat() (map[string]configurer.WGStats, error)
}

View File

@ -16,6 +16,7 @@ import (
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/iface/device"
"github.com/netbirdio/netbird/client/iface/wgproxy"
"github.com/netbirdio/netbird/connprofile"
)
const (
@ -114,7 +115,13 @@ func (w *WGIface) UpdatePeer(peerKey string, allowedIps string, keepAlive time.D
defer w.mu.Unlock()
log.Debugf("updating interface %s peer %s, endpoint %s", w.tun.DeviceName(), peerKey, endpoint)
return w.configurer.UpdatePeer(peerKey, allowedIps, keepAlive, endpoint, preSharedKey)
err := w.configurer.UpdatePeer(peerKey, allowedIps, keepAlive, endpoint, preSharedKey)
if err != nil {
return err
}
connprofile.Profiler.WireGuardConfigured(peerKey)
return nil
}
// RemovePeer removes a Wireguard Peer from the interface iface
@ -208,6 +215,10 @@ func (w *WGIface) GetStats(peerKey string) (configurer.WGStats, error) {
return w.configurer.GetStats(peerKey)
}
func (w *WGIface) GetAllStat() (map[string]configurer.WGStats, error) {
return w.configurer.GetAllStat()
}
func (w *WGIface) waitUntilRemoved() error {
maxWaitTime := 5 * time.Second
timeout := time.NewTimer(maxWaitTime)

View File

@ -33,4 +33,5 @@ type IWGIface interface {
GetFilter() device.PacketFilter
GetDevice() *device.FilteredDevice
GetStats(peerKey string) (configurer.WGStats, error)
GetAllStat() (map[string]configurer.WGStats, error)
}

View File

@ -39,6 +39,7 @@ import (
"github.com/netbirdio/netbird/client/internal/routemanager"
"github.com/netbirdio/netbird/client/internal/routemanager/systemops"
"github.com/netbirdio/netbird/client/internal/statemanager"
"github.com/netbirdio/netbird/connprofile"
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
nbssh "github.com/netbirdio/netbird/client/ssh"
@ -420,6 +421,8 @@ func (e *Engine) Start() error {
return fmt.Errorf("up wg interface: %w", err)
}
connprofile.Profiler.WGInterfaceUP(e.wgInterface)
if e.firewall != nil {
e.acl = acl.NewDefaultManager(e.firewall)
}
@ -786,7 +789,6 @@ func (e *Engine) updateTURNs(turns []*mgmProto.ProtectedHostConfig) error {
}
func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
// intentionally leave it before checking serial because for now it can happen that peer IP changed but serial didn't
if networkMap.GetPeerConfig() != nil {
err := e.updateConfig(networkMap.GetPeerConfig())
@ -821,6 +823,7 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
e.clientRoutesMu.Unlock()
log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(networkMap.GetRemotePeers()))
connprofile.Profiler.NetworkMapUpdate(networkMap.GetRemotePeers())
e.updateOfflinePeers(networkMap.GetOfflinePeers())
@ -1105,6 +1108,7 @@ func (e *Engine) receiveSignalEvents() {
RosenpassAddr: rosenpassAddr,
RelaySrvAddress: msg.GetBody().GetRelayServerAddress(),
})
connprofile.Profiler.OfferAnswerReceived(msg.Key)
case sProto.Body_ANSWER:
remoteCred, err := signal.UnMarshalCredential(msg)
if err != nil {
@ -1128,6 +1132,7 @@ func (e *Engine) receiveSignalEvents() {
RosenpassAddr: rosenpassAddr,
RelaySrvAddress: msg.GetBody().GetRelayServerAddress(),
})
connprofile.Profiler.OfferAnswerReceived(msg.Key)
case sProto.Body_CANDIDATE:
candidate, err := ice.UnmarshalCandidate(msg.GetBody().Payload)
if err != nil {

View File

@ -4,6 +4,7 @@ import (
"github.com/pion/ice/v3"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/connprofile"
signal "github.com/netbirdio/netbird/signal/client"
sProto "github.com/netbirdio/netbird/signal/proto"
)
@ -66,5 +67,6 @@ func (s *Signaler) signalOfferAnswer(offerAnswer OfferAnswer, remoteKey string,
return err
}
connprofile.Profiler.OfferSent(remoteKey)
return nil
}

7
connprofile/iface.go Normal file
View File

@ -0,0 +1,7 @@
package connprofile
import "github.com/netbirdio/netbird/client/iface/configurer"
type wgIface interface {
GetAllStat() (map[string]configurer.WGStats, error)
}

160
connprofile/profiler.go Normal file
View File

@ -0,0 +1,160 @@
package connprofile
import (
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/management/proto"
)
type Profile struct {
NetworkMapUpdate time.Time
OfferSent time.Time
OfferReceived time.Time
WireGuardConfigured time.Time
WireGuardConnected time.Time
}
type ConnProfiler struct {
profiles map[string]*Profile
profilesMu sync.Mutex
wgIface wgIface
wgMu sync.Mutex
}
func NewConnProfiler() *ConnProfiler {
return &ConnProfiler{
profiles: make(map[string]*Profile),
}
}
func (p *ConnProfiler) GetProfiles() map[string]Profile {
p.profilesMu.Lock()
defer p.profilesMu.Unlock()
copiedProfiles := make(map[string]Profile)
for key, profile := range p.profiles {
copiedProfiles[key] = Profile{
NetworkMapUpdate: profile.NetworkMapUpdate,
OfferSent: profile.OfferSent,
OfferReceived: profile.OfferReceived,
WireGuardConfigured: profile.WireGuardConfigured,
WireGuardConnected: profile.WireGuardConnected,
}
}
return copiedProfiles
}
func (p *ConnProfiler) WGInterfaceUP(wgInterface wgIface) {
p.wgMu.Lock()
defer p.wgMu.Unlock()
if p.wgIface != nil {
return
}
p.wgIface = wgInterface
go p.watchHandshakes()
}
func (p *ConnProfiler) NetworkMapUpdate(peerConfigs []*proto.RemotePeerConfig) {
p.profilesMu.Lock()
defer p.profilesMu.Unlock()
for _, peerConfig := range peerConfigs {
profile, ok := p.profiles[peerConfig.WgPubKey]
if ok {
continue
}
profile = &Profile{
NetworkMapUpdate: time.Now(),
}
p.profiles[peerConfig.WgPubKey] = profile
}
}
func (p *ConnProfiler) OfferSent(peerID string) {
p.profilesMu.Lock()
defer p.profilesMu.Unlock()
profile, ok := p.profiles[peerID]
if !ok {
log.Warnf("OfferSent: profile not found for peer %s", peerID)
return
}
if !profile.OfferSent.IsZero() {
return
}
profile.OfferSent = time.Now()
}
func (p *ConnProfiler) OfferAnswerReceived(peerID string) {
p.profilesMu.Lock()
defer p.profilesMu.Unlock()
profile, ok := p.profiles[peerID]
if !ok {
log.Warnf("OfferSent: profile not found for peer %s", peerID)
return
}
if !profile.OfferReceived.IsZero() {
return
}
profile.OfferReceived = time.Now()
}
func (p *ConnProfiler) WireGuardConfigured(peerID string) {
p.profilesMu.Lock()
defer p.profilesMu.Unlock()
profile, ok := p.profiles[peerID]
if !ok {
log.Warnf("OfferSent: profile not found for peer %s", peerID)
return
}
if !profile.WireGuardConfigured.IsZero() {
return
}
profile.WireGuardConfigured = time.Now()
}
func (p *ConnProfiler) watchHandshakes() {
ticker := time.NewTicker(300 * time.Millisecond)
for {
select {
case _ = <-ticker.C:
p.checkHandshakes()
}
}
}
func (p *ConnProfiler) checkHandshakes() {
stats, err := p.wgIface.GetAllStat()
if err != nil {
log.Errorf("watchHandshakes: %v", err)
return
}
p.profilesMu.Lock()
for peerID, profile := range p.profiles {
if !profile.WireGuardConnected.IsZero() {
continue
}
stat, ok := stats[peerID]
if !ok {
continue
}
if stat.LastHandshake.IsZero() {
continue
}
profile.WireGuardConnected = stat.LastHandshake
}
p.profilesMu.Unlock()
}

46
connprofile/report.go Normal file
View File

@ -0,0 +1,46 @@
package connprofile
import (
"encoding/json"
"time"
log "github.com/sirupsen/logrus"
)
type Report struct {
NetworkMapUpdate time.Time
OfferSent float64
OfferReceived float64
WireGuardConfigured float64
WireGuardConnected float64
}
func report() {
ticker := time.NewTicker(5 * time.Second)
for {
select {
case _ = <-ticker.C:
printJson()
}
}
}
func printJson() {
profiles := Profiler.GetProfiles()
reports := make(map[string]Report)
for key, profile := range profiles {
reports[key] = Report{
NetworkMapUpdate: profile.NetworkMapUpdate,
OfferSent: profile.OfferSent.Sub(profile.NetworkMapUpdate).Seconds(),
OfferReceived: profile.OfferReceived.Sub(profile.OfferSent).Seconds(),
WireGuardConfigured: profile.WireGuardConfigured.Sub(profile.OfferReceived).Seconds(),
WireGuardConnected: profile.WireGuardConnected.Sub(profile.WireGuardConfigured).Seconds(),
}
}
jsonData, err := json.MarshalIndent(reports, "", " ")
if err != nil {
log.Errorf("failed to marshal profiles: %v", err)
}
log.Infof("profiles: %s", jsonData)
}

10
connprofile/static.go Normal file
View File

@ -0,0 +1,10 @@
package connprofile
var (
Profiler *ConnProfiler
)
func init() {
Profiler = NewConnProfiler()
go report()
}