Merge branch 'main' into userspace-router

This commit is contained in:
Viktor Liu 2025-01-29 22:00:41 +01:00
commit 4d635e3c2f
24 changed files with 338 additions and 303 deletions

View File

@ -9,7 +9,7 @@ on:
pull_request:
env:
SIGN_PIPE_VER: "v0.0.17"
SIGN_PIPE_VER: "v0.0.18"
GORELEASER_VER: "v2.3.2"
PRODUCT_NAME: "NetBird"
COPYRIGHT: "Wiretrustee UG (haftungsbeschreankt)"

View File

@ -190,7 +190,7 @@ func runInForegroundMode(ctx context.Context, cmd *cobra.Command) error {
r.GetFullStatus()
connectClient := internal.NewConnectClient(ctx, config, r)
return connectClient.Run()
return connectClient.Run(nil)
}
func runInDaemonMode(ctx context.Context, cmd *cobra.Command) error {

View File

@ -72,7 +72,8 @@ func TestIptablesManager(t *testing.T) {
t.Run("add second rule", func(t *testing.T) {
ip := net.ParseIP("10.20.0.3")
port := &fw.Port{
Values: []uint16{8043: 8046},
IsRange: true,
Values: []uint16{8043, 8046},
}
rule2, err = manager.AddPeerFiltering(ip, "tcp", port, nil, fw.ActionAccept, "", "accept HTTPS traffic from ports range")
require.NoError(t, err, "failed to add rule")

View File

@ -2,7 +2,6 @@ package nftables
import (
"bytes"
"encoding/binary"
"fmt"
"net"
"slices"
@ -349,6 +348,10 @@ func (m *AclManager) addIOFiltering(
UserData: userData,
})
if err := m.rConn.Flush(); err != nil {
return nil, fmt.Errorf(flushError, err)
}
rule := &Rule{
nftRule: nftRule,
mangleRule: m.createPreroutingRule(expressions, userData),
@ -360,6 +363,7 @@ func (m *AclManager) addIOFiltering(
if ipset != nil {
m.ipsetStore.AddReferenceToIpset(ipset.Name)
}
return rule, nil
}
@ -700,12 +704,6 @@ func generatePeerRuleId(ip net.IP, sPort *firewall.Port, dPort *firewall.Port, a
return "set:" + ipset.Name + rulesetID
}
func encodePort(port firewall.Port) []byte {
bs := make([]byte, 2)
binary.BigEndian.PutUint16(bs, uint16(port.Values[0]))
return bs
}
func ifname(n string) []byte {
b := make([]byte, 16)
copy(b, n+"\x00")

View File

@ -59,13 +59,8 @@ func NewConnectClient(
}
// Run with main logic.
func (c *ConnectClient) Run() error {
return c.run(MobileDependency{}, nil, nil)
}
// RunWithProbes runs the client's main logic with probes attached
func (c *ConnectClient) RunWithProbes(probes *ProbeHolder, runningChan chan error) error {
return c.run(MobileDependency{}, probes, runningChan)
func (c *ConnectClient) Run(runningChan chan error) error {
return c.run(MobileDependency{}, runningChan)
}
// RunOnAndroid with main logic on mobile system
@ -84,7 +79,7 @@ func (c *ConnectClient) RunOnAndroid(
HostDNSAddresses: dnsAddresses,
DnsReadyListener: dnsReadyListener,
}
return c.run(mobileDependency, nil, nil)
return c.run(mobileDependency, nil)
}
func (c *ConnectClient) RunOniOS(
@ -102,10 +97,10 @@ func (c *ConnectClient) RunOniOS(
DnsManager: dnsManager,
StateFilePath: stateFilePath,
}
return c.run(mobileDependency, nil, nil)
return c.run(mobileDependency, nil)
}
func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHolder, runningChan chan error) error {
func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan error) error {
defer func() {
if r := recover(); r != nil {
log.Panicf("Panic occurred: %v, stack trace: %s", r, string(debug.Stack()))
@ -261,7 +256,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHold
checks := loginResp.GetChecks()
c.engineMutex.Lock()
c.engine = NewEngineWithProbes(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, probes, checks)
c.engine = NewEngine(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, checks)
c.engine.SetNetworkMapPersistence(c.persistNetworkMap)
c.engineMutex.Unlock()

View File

@ -175,8 +175,6 @@ type Engine struct {
dnsServer dns.Server
probes *ProbeHolder
// checks are the client-applied posture checks that need to be evaluated on the client
checks []*mgmProto.Checks
@ -200,7 +198,7 @@ type localIpUpdater interface {
UpdateLocalIPs() error
}
// NewEngine creates a new Connection Engine
// NewEngine creates a new Connection Engine with probes attached
func NewEngine(
clientCtx context.Context,
clientCancel context.CancelFunc,
@ -211,33 +209,6 @@ func NewEngine(
mobileDep MobileDependency,
statusRecorder *peer.Status,
checks []*mgmProto.Checks,
) *Engine {
return NewEngineWithProbes(
clientCtx,
clientCancel,
signalClient,
mgmClient,
relayManager,
config,
mobileDep,
statusRecorder,
nil,
checks,
)
}
// NewEngineWithProbes creates a new Connection Engine with probes attached
func NewEngineWithProbes(
clientCtx context.Context,
clientCancel context.CancelFunc,
signalClient signal.Client,
mgmClient mgm.Client,
relayManager *relayClient.Manager,
config *EngineConfig,
mobileDep MobileDependency,
statusRecorder *peer.Status,
probes *ProbeHolder,
checks []*mgmProto.Checks,
) *Engine {
engine := &Engine{
clientCtx: clientCtx,
@ -255,7 +226,6 @@ func NewEngineWithProbes(
networkSerial: 0,
sshServerFunc: nbssh.DefaultSSHServer,
statusRecorder: statusRecorder,
probes: probes,
checks: checks,
connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit),
}
@ -454,7 +424,6 @@ func (e *Engine) Start() error {
e.receiveSignalEvents()
e.receiveManagementEvents()
e.receiveProbeEvents()
// starting network monitor at the very last to avoid disruptions
e.startNetworkMonitor()
@ -1530,72 +1499,58 @@ func (e *Engine) getRosenpassAddr() string {
return ""
}
func (e *Engine) receiveProbeEvents() {
if e.probes == nil {
return
// RunHealthProbes executes health checks for Signal, Management, Relay and WireGuard services
// and updates the status recorder with the latest states.
func (e *Engine) RunHealthProbes() bool {
signalHealthy := e.signal.IsHealthy()
log.Debugf("signal health check: healthy=%t", signalHealthy)
managementHealthy := e.mgmClient.IsHealthy()
log.Debugf("management health check: healthy=%t", managementHealthy)
results := append(e.probeSTUNs(), e.probeTURNs()...)
e.statusRecorder.UpdateRelayStates(results)
relayHealthy := true
for _, res := range results {
if res.Err != nil {
relayHealthy = false
break
}
}
if e.probes.SignalProbe != nil {
go e.probes.SignalProbe.Receive(e.ctx, func() bool {
healthy := e.signal.IsHealthy()
log.Debugf("received signal probe request, healthy: %t", healthy)
return healthy
})
log.Debugf("relay health check: healthy=%t", relayHealthy)
for _, key := range e.peerStore.PeersPubKey() {
wgStats, err := e.wgInterface.GetStats(key)
if err != nil {
log.Debugf("failed to get wg stats for peer %s: %s", key, err)
continue
}
// wgStats could be zero value, in which case we just reset the stats
if err := e.statusRecorder.UpdateWireGuardPeerState(key, wgStats); err != nil {
log.Debugf("failed to update wg stats for peer %s: %s", key, err)
}
}
if e.probes.MgmProbe != nil {
go e.probes.MgmProbe.Receive(e.ctx, func() bool {
healthy := e.mgmClient.IsHealthy()
log.Debugf("received management probe request, healthy: %t", healthy)
return healthy
})
}
if e.probes.RelayProbe != nil {
go e.probes.RelayProbe.Receive(e.ctx, func() bool {
healthy := true
results := append(e.probeSTUNs(), e.probeTURNs()...)
e.statusRecorder.UpdateRelayStates(results)
// A single failed server will result in a "failed" probe
for _, res := range results {
if res.Err != nil {
healthy = false
break
}
}
log.Debugf("received relay probe request, healthy: %t", healthy)
return healthy
})
}
if e.probes.WgProbe != nil {
go e.probes.WgProbe.Receive(e.ctx, func() bool {
log.Debug("received wg probe request")
for _, key := range e.peerStore.PeersPubKey() {
wgStats, err := e.wgInterface.GetStats(key)
if err != nil {
log.Debugf("failed to get wg stats for peer %s: %s", key, err)
}
// wgStats could be zero value, in which case we just reset the stats
if err := e.statusRecorder.UpdateWireGuardPeerState(key, wgStats); err != nil {
log.Debugf("failed to update wg stats for peer %s: %s", key, err)
}
}
return true
})
}
allHealthy := signalHealthy && managementHealthy && relayHealthy
log.Debugf("all health checks completed: healthy=%t", allHealthy)
return allHealthy
}
func (e *Engine) probeSTUNs() []relay.ProbeResult {
return relay.ProbeAll(e.ctx, relay.ProbeSTUN, e.STUNs)
e.syncMsgMux.Lock()
stuns := slices.Clone(e.STUNs)
e.syncMsgMux.Unlock()
return relay.ProbeAll(e.ctx, relay.ProbeSTUN, stuns)
}
func (e *Engine) probeTURNs() []relay.ProbeResult {
return relay.ProbeAll(e.ctx, relay.ProbeTURN, e.TURNs)
e.syncMsgMux.Lock()
turns := slices.Clone(e.TURNs)
e.syncMsgMux.Unlock()
return relay.ProbeAll(e.ctx, relay.ProbeTURN, turns)
}
func (e *Engine) restartEngine() {

View File

@ -1,58 +0,0 @@
package internal
import "context"
type ProbeHolder struct {
MgmProbe *Probe
SignalProbe *Probe
RelayProbe *Probe
WgProbe *Probe
}
// Probe allows to run on-demand callbacks from different code locations.
// Pass the probe to a receiving and a sending end. The receiving end starts listening
// to requests with Receive and executes a callback when the sending end requests it
// by calling Probe.
type Probe struct {
request chan struct{}
result chan bool
ready bool
}
// NewProbe returns a new initialized probe.
func NewProbe() *Probe {
return &Probe{
request: make(chan struct{}),
result: make(chan bool),
}
}
// Probe requests the callback to be run and returns a bool indicating success.
// It always returns true as long as the receiver is not ready.
func (p *Probe) Probe() bool {
if !p.ready {
return true
}
p.request <- struct{}{}
return <-p.result
}
// Receive starts listening for probe requests. On such a request it runs the supplied
// callback func which must return a bool indicating success.
// Blocks until the passed context is cancelled.
func (p *Probe) Receive(ctx context.Context, callback func() bool) {
p.ready = true
defer func() {
p.ready = false
}()
for {
select {
case <-ctx.Done():
return
case <-p.request:
p.result <- callback()
}
}
}

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
runtime "runtime"
"time"
"github.com/hashicorp/go-multierror"
@ -439,7 +440,7 @@ func handlerType(rt *route.Route, useNewDNSRoute bool) int {
return handlerTypeStatic
}
if useNewDNSRoute {
if useNewDNSRoute && runtime.GOOS != "ios" {
return handlerTypeDomain
}
return handlerTypeDynamic

View File

@ -63,12 +63,7 @@ type Server struct {
statusRecorder *peer.Status
sessionWatcher *internal.SessionWatcher
mgmProbe *internal.Probe
signalProbe *internal.Probe
relayProbe *internal.Probe
wgProbe *internal.Probe
lastProbe time.Time
lastProbe time.Time
persistNetworkMap bool
}
@ -86,12 +81,7 @@ func New(ctx context.Context, configPath, logFile string) *Server {
latestConfigInput: internal.ConfigInput{
ConfigPath: configPath,
},
logFile: logFile,
mgmProbe: internal.NewProbe(),
signalProbe: internal.NewProbe(),
relayProbe: internal.NewProbe(),
wgProbe: internal.NewProbe(),
logFile: logFile,
persistNetworkMap: true,
}
}
@ -202,14 +192,7 @@ func (s *Server) connectWithRetryRuns(ctx context.Context, config *internal.Conf
s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder)
s.connectClient.SetNetworkMapPersistence(s.persistNetworkMap)
probes := internal.ProbeHolder{
MgmProbe: s.mgmProbe,
SignalProbe: s.signalProbe,
RelayProbe: s.relayProbe,
WgProbe: s.wgProbe,
}
err := s.connectClient.RunWithProbes(&probes, runningChan)
err := s.connectClient.Run(runningChan)
if err != nil {
log.Debugf("run client connection exited with error: %v. Will retry in the background", err)
}
@ -676,9 +659,13 @@ func (s *Server) Down(ctx context.Context, _ *proto.DownRequest) (*proto.DownRes
// Status returns the daemon status
func (s *Server) Status(
_ context.Context,
ctx context.Context,
msg *proto.StatusRequest,
) (*proto.StatusResponse, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
s.mutex.Lock()
defer s.mutex.Unlock()
@ -707,14 +694,17 @@ func (s *Server) Status(
}
func (s *Server) runProbes() {
if time.Since(s.lastProbe) > probeThreshold {
managementHealthy := s.mgmProbe.Probe()
signalHealthy := s.signalProbe.Probe()
relayHealthy := s.relayProbe.Probe()
wgProbe := s.wgProbe.Probe()
if s.connectClient == nil {
return
}
// Update last time only if all probes were successful
if managementHealthy && signalHealthy && relayHealthy && wgProbe {
engine := s.connectClient.Engine()
if engine == nil {
return
}
if time.Since(s.lastProbe) > probeThreshold {
if engine.RunHealthProbes() {
s.lastProbe = time.Now()
}
}

2
go.mod
View File

@ -92,7 +92,7 @@ require (
goauthentik.io/api/v3 v3.2023051.3
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
golang.org/x/mobile v0.0.0-20231127183840-76ac6878050a
golang.org/x/net v0.30.0
golang.org/x/net v0.33.0
golang.org/x/oauth2 v0.19.0
golang.org/x/sync v0.10.0
golang.org/x/term v0.28.0

4
go.sum
View File

@ -883,8 +883,8 @@ golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=

View File

@ -3005,6 +3005,8 @@ func peerShouldReceiveUpdate(t *testing.T, updateMessage <-chan *UpdateMessage)
}
func BenchmarkSyncAndMarkPeer(b *testing.B) {
b.Setenv("NB_GET_ACCOUNT_BUFFER_INTERVAL", "0")
benchCases := []struct {
name string
peers int
@ -3015,10 +3017,10 @@ func BenchmarkSyncAndMarkPeer(b *testing.B) {
minMsPerOpCICD float64
maxMsPerOpCICD float64
}{
{"Small", 50, 5, 1, 3, 3, 19},
{"Medium", 500, 100, 7, 13, 10, 90},
{"Large", 5000, 200, 65, 80, 60, 240},
{"Small single", 50, 10, 1, 3, 3, 80},
{"Small", 50, 5, 1, 5, 3, 19},
{"Medium", 500, 100, 7, 22, 10, 90},
{"Large", 5000, 200, 65, 110, 60, 240},
{"Small single", 50, 10, 1, 4, 3, 80},
{"Medium single", 500, 10, 7, 13, 10, 37},
{"Large 5", 5000, 15, 65, 80, 60, 220},
}
@ -3072,6 +3074,7 @@ func BenchmarkSyncAndMarkPeer(b *testing.B) {
}
func BenchmarkLoginPeer_ExistingPeer(b *testing.B) {
b.Setenv("NB_GET_ACCOUNT_BUFFER_INTERVAL", "0")
benchCases := []struct {
name string
peers int
@ -3082,12 +3085,12 @@ func BenchmarkLoginPeer_ExistingPeer(b *testing.B) {
minMsPerOpCICD float64
maxMsPerOpCICD float64
}{
{"Small", 50, 5, 102, 110, 3, 20},
{"Medium", 500, 100, 105, 140, 20, 110},
{"Large", 5000, 200, 160, 200, 120, 260},
{"Small single", 50, 10, 102, 110, 5, 40},
{"Medium single", 500, 10, 105, 140, 10, 60},
{"Large 5", 5000, 15, 160, 200, 60, 180},
{"Small", 50, 5, 2, 10, 3, 35},
{"Medium", 500, 100, 5, 40, 20, 110},
{"Large", 5000, 200, 60, 100, 120, 260},
{"Small single", 50, 10, 2, 10, 5, 40},
{"Medium single", 500, 10, 5, 40, 10, 60},
{"Large 5", 5000, 15, 60, 100, 60, 180},
}
log.SetOutput(io.Discard)
@ -3146,6 +3149,7 @@ func BenchmarkLoginPeer_ExistingPeer(b *testing.B) {
}
func BenchmarkLoginPeer_NewPeer(b *testing.B) {
b.Setenv("NB_GET_ACCOUNT_BUFFER_INTERVAL", "0")
benchCases := []struct {
name string
peers int
@ -3156,12 +3160,12 @@ func BenchmarkLoginPeer_NewPeer(b *testing.B) {
minMsPerOpCICD float64
maxMsPerOpCICD float64
}{
{"Small", 50, 5, 107, 120, 10, 80},
{"Medium", 500, 100, 105, 140, 30, 140},
{"Large", 5000, 200, 180, 220, 140, 300},
{"Small single", 50, 10, 107, 120, 10, 80},
{"Medium single", 500, 10, 105, 140, 20, 60},
{"Large 5", 5000, 15, 180, 220, 80, 200},
{"Small", 50, 5, 7, 20, 10, 80},
{"Medium", 500, 100, 5, 40, 30, 140},
{"Large", 5000, 200, 80, 120, 140, 300},
{"Small single", 50, 10, 7, 20, 10, 80},
{"Medium single", 500, 10, 5, 40, 20, 60},
{"Large 5", 5000, 15, 80, 120, 80, 200},
}
log.SetOutput(io.Discard)

View File

@ -145,14 +145,14 @@ func BenchmarkGetAllPeers(b *testing.B) {
func BenchmarkDeletePeer(b *testing.B) {
var expectedMetrics = map[string]testing_tools.PerformanceMetrics{
"Peers - XS": {MinMsPerOpLocal: 0, MaxMsPerOpLocal: 4, MinMsPerOpCICD: 2, MaxMsPerOpCICD: 16},
"Peers - S": {MinMsPerOpLocal: 0, MaxMsPerOpLocal: 4, MinMsPerOpCICD: 2, MaxMsPerOpCICD: 16},
"Peers - M": {MinMsPerOpLocal: 0, MaxMsPerOpLocal: 4, MinMsPerOpCICD: 2, MaxMsPerOpCICD: 16},
"Peers - L": {MinMsPerOpLocal: 0, MaxMsPerOpLocal: 4, MinMsPerOpCICD: 2, MaxMsPerOpCICD: 16},
"Groups - L": {MinMsPerOpLocal: 0, MaxMsPerOpLocal: 4, MinMsPerOpCICD: 2, MaxMsPerOpCICD: 16},
"Users - L": {MinMsPerOpLocal: 0, MaxMsPerOpLocal: 4, MinMsPerOpCICD: 2, MaxMsPerOpCICD: 16},
"Setup Keys - L": {MinMsPerOpLocal: 0, MaxMsPerOpLocal: 4, MinMsPerOpCICD: 2, MaxMsPerOpCICD: 16},
"Peers - XL": {MinMsPerOpLocal: 0, MaxMsPerOpLocal: 4, MinMsPerOpCICD: 2, MaxMsPerOpCICD: 16},
"Peers - XS": {MinMsPerOpLocal: 0, MaxMsPerOpLocal: 4, MinMsPerOpCICD: 2, MaxMsPerOpCICD: 18},
"Peers - S": {MinMsPerOpLocal: 0, MaxMsPerOpLocal: 4, MinMsPerOpCICD: 2, MaxMsPerOpCICD: 18},
"Peers - M": {MinMsPerOpLocal: 0, MaxMsPerOpLocal: 4, MinMsPerOpCICD: 2, MaxMsPerOpCICD: 18},
"Peers - L": {MinMsPerOpLocal: 0, MaxMsPerOpLocal: 4, MinMsPerOpCICD: 2, MaxMsPerOpCICD: 18},
"Groups - L": {MinMsPerOpLocal: 0, MaxMsPerOpLocal: 4, MinMsPerOpCICD: 2, MaxMsPerOpCICD: 18},
"Users - L": {MinMsPerOpLocal: 0, MaxMsPerOpLocal: 4, MinMsPerOpCICD: 2, MaxMsPerOpCICD: 18},
"Setup Keys - L": {MinMsPerOpLocal: 0, MaxMsPerOpLocal: 4, MinMsPerOpCICD: 2, MaxMsPerOpCICD: 18},
"Peers - XL": {MinMsPerOpLocal: 0, MaxMsPerOpLocal: 4, MinMsPerOpCICD: 2, MaxMsPerOpCICD: 18},
}
log.SetOutput(io.Discard)

View File

@ -989,7 +989,7 @@ func (am *DefaultAccountManager) getValidatedPeerWithMap(ctx context.Context, is
return peer, emptyMap, nil, nil
}
account, err := am.Store.GetAccount(ctx, accountID)
account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
if err != nil {
return nil, nil, nil, err
}
@ -1130,11 +1130,6 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
}
start := time.Now()
defer func() {
if am.metrics != nil {
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(start))
}
}()
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra)
if err != nil {
@ -1175,6 +1170,9 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
}
wg.Wait()
if am.metrics != nil {
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(start))
}
}
// UpdateAccountPeer updates a single peer that belongs to an account.

View File

@ -22,7 +22,8 @@ func NewAccountManagerMetrics(ctx context.Context, meter metric.Meter) (*Account
metric.WithUnit("milliseconds"),
metric.WithExplicitBucketBoundaries(
0.5, 1, 2.5, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 30000,
))
),
metric.WithDescription("Duration of triggering the account peers update and preparing the required data for the network map being sent to the clients"))
if err != nil {
return nil, err
}
@ -31,7 +32,8 @@ func NewAccountManagerMetrics(ctx context.Context, meter metric.Meter) (*Account
metric.WithUnit("milliseconds"),
metric.WithExplicitBucketBoundaries(
0.1, 0.5, 1, 2.5, 5, 10, 25, 50, 100, 250, 500, 1000,
))
),
metric.WithDescription("Duration of calculating the peer network map that is sent to the clients"))
if err != nil {
return nil, err
}
@ -40,12 +42,15 @@ func NewAccountManagerMetrics(ctx context.Context, meter metric.Meter) (*Account
metric.WithUnit("objects"),
metric.WithExplicitBucketBoundaries(
50, 100, 200, 500, 1000, 2500, 5000, 10000,
))
),
metric.WithDescription("Number of objects in the network map like peers, routes, firewall rules, etc. that are sent to the clients"))
if err != nil {
return nil, err
}
peerMetaUpdateCount, err := meter.Int64Counter("management.account.peer.meta.update.counter", metric.WithUnit("1"))
peerMetaUpdateCount, err := meter.Int64Counter("management.account.peer.meta.update.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of updates with new meta data from the peers"))
if err != nil {
return nil, err
}

View File

@ -22,32 +22,50 @@ type GRPCMetrics struct {
// NewGRPCMetrics creates new GRPCMetrics struct and registers common metrics of the gRPC server
func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, error) {
syncRequestsCounter, err := meter.Int64Counter("management.grpc.sync.request.counter", metric.WithUnit("1"))
syncRequestsCounter, err := meter.Int64Counter("management.grpc.sync.request.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of sync gRPC requests from the peers to establish a connection and receive network map updates (update channel)"),
)
if err != nil {
return nil, err
}
loginRequestsCounter, err := meter.Int64Counter("management.grpc.login.request.counter", metric.WithUnit("1"))
loginRequestsCounter, err := meter.Int64Counter("management.grpc.login.request.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of login gRPC requests from the peers to authenticate and receive initial configuration and relay credentials"),
)
if err != nil {
return nil, err
}
getKeyRequestsCounter, err := meter.Int64Counter("management.grpc.key.request.counter", metric.WithUnit("1"))
getKeyRequestsCounter, err := meter.Int64Counter("management.grpc.key.request.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of key gRPC requests from the peers to get the server's public WireGuard key"),
)
if err != nil {
return nil, err
}
activeStreamsGauge, err := meter.Int64ObservableGauge("management.grpc.connected.streams", metric.WithUnit("1"))
activeStreamsGauge, err := meter.Int64ObservableGauge("management.grpc.connected.streams",
metric.WithUnit("1"),
metric.WithDescription("Number of active peer streams connected to the gRPC server"),
)
if err != nil {
return nil, err
}
syncRequestDuration, err := meter.Int64Histogram("management.grpc.sync.request.duration.ms", metric.WithUnit("milliseconds"))
syncRequestDuration, err := meter.Int64Histogram("management.grpc.sync.request.duration.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("Duration of the sync gRPC requests from the peers to establish a connection and receive network map updates (update channel)"),
)
if err != nil {
return nil, err
}
loginRequestDuration, err := meter.Int64Histogram("management.grpc.login.request.duration.ms", metric.WithUnit("milliseconds"))
loginRequestDuration, err := meter.Int64Histogram("management.grpc.login.request.duration.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("Duration of the login gRPC requests from the peers to authenticate and receive initial configuration and relay credentials"),
)
if err != nil {
return nil, err
}
@ -57,7 +75,7 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro
// TODO(yury): This needs custom bucketing as we are interested in the values from 0 to server.channelBufferSize (100)
channelQueue, err := meter.Int64Histogram(
"management.grpc.updatechannel.queue",
metric.WithDescription("Number of update messages in the channel queue"),
metric.WithDescription("Number of update messages piling up in the update channel queue"),
metric.WithUnit("length"),
)
if err != nil {

View File

@ -74,37 +74,58 @@ type HTTPMiddleware struct {
// NewMetricsMiddleware creates a new HTTPMiddleware
func NewMetricsMiddleware(ctx context.Context, meter metric.Meter) (*HTTPMiddleware, error) {
httpRequestCounter, err := meter.Int64Counter(httpRequestCounterPrefix, metric.WithUnit("1"))
httpRequestCounter, err := meter.Int64Counter(httpRequestCounterPrefix,
metric.WithUnit("1"),
metric.WithDescription("Number of incoming HTTP requests by endpoint and method"),
)
if err != nil {
return nil, err
}
httpResponseCounter, err := meter.Int64Counter(httpResponseCounterPrefix, metric.WithUnit("1"))
httpResponseCounter, err := meter.Int64Counter(httpResponseCounterPrefix,
metric.WithUnit("1"),
metric.WithDescription("Number of outgoing HTTP responses by endpoint, method and returned status code"),
)
if err != nil {
return nil, err
}
totalHTTPRequestsCounter, err := meter.Int64Counter(fmt.Sprintf("%s.total", httpRequestCounterPrefix), metric.WithUnit("1"))
totalHTTPRequestsCounter, err := meter.Int64Counter(fmt.Sprintf("%s.total", httpRequestCounterPrefix),
metric.WithUnit("1"),
metric.WithDescription("Number of incoming HTTP requests"),
)
if err != nil {
return nil, err
}
totalHTTPResponseCounter, err := meter.Int64Counter(fmt.Sprintf("%s.total", httpResponseCounterPrefix), metric.WithUnit("1"))
totalHTTPResponseCounter, err := meter.Int64Counter(fmt.Sprintf("%s.total", httpResponseCounterPrefix),
metric.WithUnit("1"),
metric.WithDescription("Number of outgoing HTTP responses"),
)
if err != nil {
return nil, err
}
totalHTTPResponseCodeCounter, err := meter.Int64Counter(fmt.Sprintf("%s.code.total", httpResponseCounterPrefix), metric.WithUnit("1"))
totalHTTPResponseCodeCounter, err := meter.Int64Counter(fmt.Sprintf("%s.code.total", httpResponseCounterPrefix),
metric.WithUnit("1"),
metric.WithDescription("Number of outgoing HTTP responses by status code"),
)
if err != nil {
return nil, err
}
httpRequestDuration, err := meter.Int64Histogram(httpRequestDurationPrefix, metric.WithUnit("milliseconds"))
httpRequestDuration, err := meter.Int64Histogram(httpRequestDurationPrefix,
metric.WithUnit("milliseconds"),
metric.WithDescription("Duration of incoming HTTP requests by endpoint and method"),
)
if err != nil {
return nil, err
}
totalHTTPRequestDuration, err := meter.Int64Histogram(fmt.Sprintf("%s.total", httpRequestDurationPrefix), metric.WithUnit("milliseconds"))
totalHTTPRequestDuration, err := meter.Int64Histogram(fmt.Sprintf("%s.total", httpRequestDurationPrefix),
metric.WithUnit("milliseconds"),
metric.WithDescription("Duration of incoming HTTP requests"),
)
if err != nil {
return nil, err
}

View File

@ -23,43 +23,73 @@ type IDPMetrics struct {
// NewIDPMetrics creates new IDPMetrics struct and registers common
func NewIDPMetrics(ctx context.Context, meter metric.Meter) (*IDPMetrics, error) {
metaUpdateCounter, err := meter.Int64Counter("management.idp.update.user.meta.counter", metric.WithUnit("1"))
metaUpdateCounter, err := meter.Int64Counter("management.idp.update.user.meta.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of updates of user metadata sent to the configured identity provider"),
)
if err != nil {
return nil, err
}
getUserByEmailCounter, err := meter.Int64Counter("management.idp.get.user.by.email.counter", metric.WithUnit("1"))
getUserByEmailCounter, err := meter.Int64Counter("management.idp.get.user.by.email.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of requests to get a user by email from the configured identity provider"),
)
if err != nil {
return nil, err
}
getAllAccountsCounter, err := meter.Int64Counter("management.idp.get.accounts.counter", metric.WithUnit("1"))
getAllAccountsCounter, err := meter.Int64Counter("management.idp.get.accounts.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of requests to get all accounts from the configured identity provider"),
)
if err != nil {
return nil, err
}
createUserCounter, err := meter.Int64Counter("management.idp.create.user.counter", metric.WithUnit("1"))
createUserCounter, err := meter.Int64Counter("management.idp.create.user.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of requests to create a new user in the configured identity provider"),
)
if err != nil {
return nil, err
}
deleteUserCounter, err := meter.Int64Counter("management.idp.delete.user.counter", metric.WithUnit("1"))
deleteUserCounter, err := meter.Int64Counter("management.idp.delete.user.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of requests to delete a user from the configured identity provider"),
)
if err != nil {
return nil, err
}
getAccountCounter, err := meter.Int64Counter("management.idp.get.account.counter", metric.WithUnit("1"))
getAccountCounter, err := meter.Int64Counter("management.idp.get.account.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of requests to get all users in an account from the configured identity provider"),
)
if err != nil {
return nil, err
}
getUserByIDCounter, err := meter.Int64Counter("management.idp.get.user.by.id.counter", metric.WithUnit("1"))
getUserByIDCounter, err := meter.Int64Counter("management.idp.get.user.by.id.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of requests to get a user by ID from the configured identity provider"),
)
if err != nil {
return nil, err
}
authenticateRequestCounter, err := meter.Int64Counter("management.idp.authenticate.request.counter", metric.WithUnit("1"))
authenticateRequestCounter, err := meter.Int64Counter("management.idp.authenticate.request.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of requests to authenticate the server with the configured identity provider"),
)
if err != nil {
return nil, err
}
requestErrorCounter, err := meter.Int64Counter("management.idp.request.error.counter", metric.WithUnit("1"))
requestErrorCounter, err := meter.Int64Counter("management.idp.request.error.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of errors that happened when doing http request to the configured identity provider"),
)
if err != nil {
return nil, err
}
requestStatusErrorCounter, err := meter.Int64Counter("management.idp.request.status.error.counter", metric.WithUnit("1"))
requestStatusErrorCounter, err := meter.Int64Counter("management.idp.request.status.error.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of responses that came from the configured identity provider with non success status code"),
)
if err != nil {
return nil, err
}

View File

@ -20,28 +20,41 @@ type StoreMetrics struct {
// NewStoreMetrics creates an instance of StoreMetrics
func NewStoreMetrics(ctx context.Context, meter metric.Meter) (*StoreMetrics, error) {
globalLockAcquisitionDurationMicro, err := meter.Int64Histogram("management.store.global.lock.acquisition.duration.micro",
metric.WithUnit("microseconds"))
metric.WithUnit("microseconds"),
metric.WithDescription("Duration of how long it takes to acquire the global lock in the store to block all other requests to the store"),
)
if err != nil {
return nil, err
}
globalLockAcquisitionDurationMs, err := meter.Int64Histogram("management.store.global.lock.acquisition.duration.ms")
globalLockAcquisitionDurationMs, err := meter.Int64Histogram("management.store.global.lock.acquisition.duration.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("Duration of how long a process holds the acquired global lock in the store"),
)
if err != nil {
return nil, err
}
persistenceDurationMicro, err := meter.Int64Histogram("management.store.persistence.duration.micro",
metric.WithUnit("microseconds"))
metric.WithUnit("microseconds"),
metric.WithDescription("Duration of how long it takes to save or delete an account in the store"),
)
if err != nil {
return nil, err
}
persistenceDurationMs, err := meter.Int64Histogram("management.store.persistence.duration.ms")
persistenceDurationMs, err := meter.Int64Histogram("management.store.persistence.duration.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("Duration of how long it takes to save or delete an account in the store"),
)
if err != nil {
return nil, err
}
transactionDurationMs, err := meter.Int64Histogram("management.store.transaction.duration.ms")
transactionDurationMs, err := meter.Int64Histogram("management.store.transaction.duration.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("Duration of how long it takes to execute a transaction in the store"),
)
if err != nil {
return nil, err
}

View File

@ -23,42 +23,68 @@ type UpdateChannelMetrics struct {
// NewUpdateChannelMetrics creates an instance of UpdateChannel
func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateChannelMetrics, error) {
createChannelDurationMicro, err := meter.Int64Histogram("management.updatechannel.create.duration.micro")
createChannelDurationMicro, err := meter.Int64Histogram("management.updatechannel.create.duration.micro",
metric.WithUnit("microseconds"),
metric.WithDescription("Duration of how long it takes to create a new peer update channel"),
)
if err != nil {
return nil, err
}
closeChannelDurationMicro, err := meter.Int64Histogram("management.updatechannel.close.one.duration.micro")
closeChannelDurationMicro, err := meter.Int64Histogram("management.updatechannel.close.one.duration.micro",
metric.WithUnit("microseconds"),
metric.WithDescription("Duration of how long it takes to close a peer update channel"),
)
if err != nil {
return nil, err
}
closeChannelsDurationMicro, err := meter.Int64Histogram("management.updatechannel.close.multiple.duration.micro")
closeChannelsDurationMicro, err := meter.Int64Histogram("management.updatechannel.close.multiple.duration.micro",
metric.WithUnit("microseconds"),
metric.WithDescription("Duration of how long it takes to close a set of peer update channels"),
)
if err != nil {
return nil, err
}
closeChannels, err := meter.Int64Histogram("management.updatechannel.close.multiple.channels")
closeChannels, err := meter.Int64Histogram("management.updatechannel.close.multiple.channels",
metric.WithUnit("1"),
metric.WithDescription("Number of peer update channels that have been closed"),
)
if err != nil {
return nil, err
}
sendUpdateDurationMicro, err := meter.Int64Histogram("management.updatechannel.send.duration.micro")
sendUpdateDurationMicro, err := meter.Int64Histogram("management.updatechannel.send.duration.micro",
metric.WithUnit("microseconds"),
metric.WithDescription("Duration of how long it takes to send an network map update to a peer"),
)
if err != nil {
return nil, err
}
getAllConnectedPeersDurationMicro, err := meter.Int64Histogram("management.updatechannel.get.all.duration.micro")
getAllConnectedPeersDurationMicro, err := meter.Int64Histogram("management.updatechannel.get.all.duration.micro",
metric.WithUnit("microseconds"),
metric.WithDescription("Duration of how long it takes to get all connected peers"),
)
if err != nil {
return nil, err
}
getAllConnectedPeers, err := meter.Int64Histogram("management.updatechannel.get.all.peers")
getAllConnectedPeers, err := meter.Int64Histogram("management.updatechannel.get.all.peers",
metric.WithUnit("1"),
metric.WithDescription("Number of connected peers"),
)
if err != nil {
return nil, err
}
hasChannelDurationMicro, err := meter.Int64Histogram("management.updatechannel.haschannel.duration.micro")
hasChannelDurationMicro, err := meter.Int64Histogram("management.updatechannel.haschannel.duration.micro",
metric.WithUnit("microseconds"),
metric.WithDescription("Duration of how long it takes to check if a peer has a channel"),
)
if err != nil {
return nil, err
}

View File

@ -289,14 +289,14 @@ func (a *Account) GetPeerNetworkMap(
}
if metrics != nil {
objectCount := int64(len(peersToConnect) + len(expiredPeers) + len(routesUpdate) + len(firewallRules))
objectCount := int64(len(peersToConnectIncludingRouters) + len(expiredPeers) + len(routesUpdate) + len(networkResourcesRoutes) + len(firewallRules) + +len(networkResourcesFirewallRules) + len(routesFirewallRules))
metrics.CountNetworkMapObjects(objectCount)
metrics.CountGetPeerNetworkMapDuration(time.Since(start))
if objectCount > 5000 {
log.WithContext(ctx).Tracef("account: %s has a total resource count of %d objects, "+
"peers to connect: %d, expired peers: %d, routes: %d, firewall rules: %d",
a.Id, objectCount, len(peersToConnect), len(expiredPeers), len(routesUpdate), len(firewallRules))
"peers to connect: %d, expired peers: %d, routes: %d, firewall rules: %d, network resources routes: %d, network resources firewall rules: %d, routes firewall rules: %d",
a.Id, objectCount, len(peersToConnectIncludingRouters), len(expiredPeers), len(routesUpdate), len(firewallRules), len(networkResourcesRoutes), len(networkResourcesFirewallRules), len(routesFirewallRules))
}
}

View File

@ -66,18 +66,20 @@ type PolicyRule struct {
// Copy returns a copy of a policy rule
func (pm *PolicyRule) Copy() *PolicyRule {
rule := &PolicyRule{
ID: pm.ID,
PolicyID: pm.PolicyID,
Name: pm.Name,
Description: pm.Description,
Enabled: pm.Enabled,
Action: pm.Action,
Destinations: make([]string, len(pm.Destinations)),
Sources: make([]string, len(pm.Sources)),
Bidirectional: pm.Bidirectional,
Protocol: pm.Protocol,
Ports: make([]string, len(pm.Ports)),
PortRanges: make([]RulePortRange, len(pm.PortRanges)),
ID: pm.ID,
PolicyID: pm.PolicyID,
Name: pm.Name,
Description: pm.Description,
Enabled: pm.Enabled,
Action: pm.Action,
Destinations: make([]string, len(pm.Destinations)),
DestinationResource: pm.DestinationResource,
Sources: make([]string, len(pm.Sources)),
SourceResource: pm.SourceResource,
Bidirectional: pm.Bidirectional,
Protocol: pm.Protocol,
Ports: make([]string, len(pm.Ports)),
PortRanges: make([]RulePortRange, len(pm.PortRanges)),
}
copy(rule.Destinations, pm.Destinations)
copy(rule.Sources, pm.Sources)

View File

@ -29,37 +29,53 @@ type Metrics struct {
}
func NewMetrics(ctx context.Context, meter metric.Meter) (*Metrics, error) {
bytesSent, err := meter.Int64Counter("relay_transfer_sent_bytes_total")
bytesSent, err := meter.Int64Counter("relay_transfer_sent_bytes_total",
metric.WithDescription("Total number of bytes sent to peers"),
)
if err != nil {
return nil, err
}
bytesRecv, err := meter.Int64Counter("relay_transfer_received_bytes_total")
bytesRecv, err := meter.Int64Counter("relay_transfer_received_bytes_total",
metric.WithDescription("Total number of bytes received from peers"),
)
if err != nil {
return nil, err
}
peers, err := meter.Int64UpDownCounter("relay_peers")
peers, err := meter.Int64UpDownCounter("relay_peers",
metric.WithDescription("Number of connected peers"),
)
if err != nil {
return nil, err
}
peersActive, err := meter.Int64ObservableGauge("relay_peers_active")
peersActive, err := meter.Int64ObservableGauge("relay_peers_active",
metric.WithDescription("Number of active connected peers"),
)
if err != nil {
return nil, err
}
peersIdle, err := meter.Int64ObservableGauge("relay_peers_idle")
peersIdle, err := meter.Int64ObservableGauge("relay_peers_idle",
metric.WithDescription("Number of idle connected peers"),
)
if err != nil {
return nil, err
}
authTime, err := meter.Float64Histogram("relay_peer_authentication_time_milliseconds", metric.WithExplicitBucketBoundaries(getStandardBucketBoundaries()...))
authTime, err := meter.Float64Histogram("relay_peer_authentication_time_milliseconds",
metric.WithExplicitBucketBoundaries(getStandardBucketBoundaries()...),
metric.WithDescription("Time taken to authenticate a peer"),
)
if err != nil {
return nil, err
}
peerStoreTime, err := meter.Float64Histogram("relay_peer_store_time_milliseconds", metric.WithExplicitBucketBoundaries(getStandardBucketBoundaries()...))
peerStoreTime, err := meter.Float64Histogram("relay_peer_store_time_milliseconds",
metric.WithExplicitBucketBoundaries(getStandardBucketBoundaries()...),
metric.WithDescription("Time taken to store a new peer connection"),
)
if err != nil {
return nil, err
}

View File

@ -23,56 +23,76 @@ type AppMetrics struct {
}
func NewAppMetrics(meter metric.Meter) (*AppMetrics, error) {
activePeers, err := meter.Int64UpDownCounter("active_peers")
activePeers, err := meter.Int64UpDownCounter("active_peers",
metric.WithDescription("Number of active connected peers"),
)
if err != nil {
return nil, err
}
peerConnectionDuration, err := meter.Int64Histogram("peer_connection_duration_seconds",
metric.WithExplicitBucketBoundaries(getPeerConnectionDurationBucketBoundaries()...))
metric.WithExplicitBucketBoundaries(getPeerConnectionDurationBucketBoundaries()...),
metric.WithDescription("Duration of how long a peer was connected"),
)
if err != nil {
return nil, err
}
registrations, err := meter.Int64Counter("registrations_total")
registrations, err := meter.Int64Counter("registrations_total",
metric.WithDescription("Total number of peer registrations"),
)
if err != nil {
return nil, err
}
deregistrations, err := meter.Int64Counter("deregistrations_total")
deregistrations, err := meter.Int64Counter("deregistrations_total",
metric.WithDescription("Total number of peer deregistrations"),
)
if err != nil {
return nil, err
}
registrationFailures, err := meter.Int64Counter("registration_failures_total")
registrationFailures, err := meter.Int64Counter("registration_failures_total",
metric.WithDescription("Total number of peer registration failures"),
)
if err != nil {
return nil, err
}
registrationDelay, err := meter.Float64Histogram("registration_delay_milliseconds",
metric.WithExplicitBucketBoundaries(getStandardBucketBoundaries()...))
metric.WithExplicitBucketBoundaries(getStandardBucketBoundaries()...),
metric.WithDescription("Duration of how long it takes to register a peer"),
)
if err != nil {
return nil, err
}
getRegistrationDelay, err := meter.Float64Histogram("get_registration_delay_milliseconds",
metric.WithExplicitBucketBoundaries(getStandardBucketBoundaries()...))
metric.WithExplicitBucketBoundaries(getStandardBucketBoundaries()...),
metric.WithDescription("Duration of how long it takes to load a connection from the registry"),
)
if err != nil {
return nil, err
}
messagesForwarded, err := meter.Int64Counter("messages_forwarded_total")
messagesForwarded, err := meter.Int64Counter("messages_forwarded_total",
metric.WithDescription("Total number of messages forwarded to peers"),
)
if err != nil {
return nil, err
}
messageForwardFailures, err := meter.Int64Counter("message_forward_failures_total")
messageForwardFailures, err := meter.Int64Counter("message_forward_failures_total",
metric.WithDescription("Total number of message forwarding failures"),
)
if err != nil {
return nil, err
}
messageForwardLatency, err := meter.Float64Histogram("message_forward_latency_milliseconds",
metric.WithExplicitBucketBoundaries(getStandardBucketBoundaries()...))
metric.WithExplicitBucketBoundaries(getStandardBucketBoundaries()...),
metric.WithDescription("Duration of how long it takes to forward a message to a peer"),
)
if err != nil {
return nil, err
}