Add metrics

This commit is contained in:
Zoltán Papp 2024-07-24 16:26:26 +02:00
parent 4802b83ef9
commit ecb6f0831e
6 changed files with 156 additions and 53 deletions

View File

@ -11,6 +11,7 @@ import (
"time"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"github.com/netbirdio/netbird/relay/auth"
"github.com/netbirdio/netbird/relay/auth/hmac"
@ -35,7 +36,10 @@ func TestMain(m *testing.M) {
func TestClient(t *testing.T) {
ctx := context.Background()
srv := server.NewServer(serverURL, false, av)
srv, err := server.NewServer(otel.Meter(""), serverURL, false, av)
if err != nil {
t.Fatalf("failed to create server: %s", err)
}
errChan := make(chan error, 1)
go func() {
listenCfg := server.ListenerConfig{Address: serverListenAddr}
@ -58,7 +62,7 @@ func TestClient(t *testing.T) {
}
t.Log("alice connecting to server")
clientAlice := NewClient(ctx, serverURL, hmacTokenStore, "alice")
err := clientAlice.Connect()
err = clientAlice.Connect()
if err != nil {
t.Fatalf("failed to connect to server: %s", err)
}
@ -133,7 +137,10 @@ func transfer(t *testing.T, testData []byte, peerPairs int) {
serverAddress := fmt.Sprintf("127.0.0.1:%d", port)
serverConnURL := fmt.Sprintf("rel://%s", serverAddress)
srv := server.NewServer(serverConnURL, false, av)
srv, err := server.NewServer(otel.Meter(""), serverConnURL, false, av)
if err != nil {
t.Fatalf("failed to create server: %s", err)
}
errChan := make(chan error, 1)
go func() {
listenCfg := server.ListenerConfig{Address: serverAddress}
@ -259,7 +266,10 @@ func transfer(t *testing.T, testData []byte, peerPairs int) {
func TestRegistration(t *testing.T) {
ctx := context.Background()
srvCfg := server.ListenerConfig{Address: serverListenAddr}
srv := server.NewServer(serverURL, false, av)
srv, err := server.NewServer(otel.Meter(""), serverURL, false, av)
if err != nil {
t.Fatalf("failed to create server: %s", err)
}
errChan := make(chan error, 1)
go func() {
err := srv.Listen(srvCfg)
@ -274,7 +284,7 @@ func TestRegistration(t *testing.T) {
}
clientAlice := NewClient(ctx, serverURL, hmacTokenStore, "alice")
err := clientAlice.Connect()
err = clientAlice.Connect()
if err != nil {
_ = srv.Close()
t.Fatalf("failed to connect to server: %s", err)
@ -330,7 +340,10 @@ func TestEcho(t *testing.T) {
idAlice := "alice"
idBob := "bob"
srvCfg := server.ListenerConfig{Address: serverListenAddr}
srv := server.NewServer(serverURL, false, av)
srv, err := server.NewServer(otel.Meter(""), serverURL, false, av)
if err != nil {
t.Fatalf("failed to create server: %s", err)
}
errChan := make(chan error, 1)
go func() {
err := srv.Listen(srvCfg)
@ -352,7 +365,7 @@ func TestEcho(t *testing.T) {
}
clientAlice := NewClient(ctx, serverURL, hmacTokenStore, idAlice)
err := clientAlice.Connect()
err = clientAlice.Connect()
if err != nil {
t.Fatalf("failed to connect to server: %s", err)
}
@ -416,7 +429,10 @@ func TestBindToUnavailabePeer(t *testing.T) {
ctx := context.Background()
srvCfg := server.ListenerConfig{Address: serverListenAddr}
srv := server.NewServer(serverURL, false, av)
srv, err := server.NewServer(otel.Meter(""), serverURL, false, av)
if err != nil {
t.Fatalf("failed to create server: %s", err)
}
errChan := make(chan error, 1)
go func() {
err := srv.Listen(srvCfg)
@ -439,7 +455,7 @@ func TestBindToUnavailabePeer(t *testing.T) {
}
clientAlice := NewClient(ctx, serverURL, hmacTokenStore, "alice")
err := clientAlice.Connect()
err = clientAlice.Connect()
if err != nil {
t.Errorf("failed to connect to server: %s", err)
}
@ -459,7 +475,10 @@ func TestBindReconnect(t *testing.T) {
ctx := context.Background()
srvCfg := server.ListenerConfig{Address: serverListenAddr}
srv := server.NewServer(serverURL, false, av)
srv, err := server.NewServer(otel.Meter(""), serverURL, false, av)
if err != nil {
t.Fatalf("failed to create server: %s", err)
}
errChan := make(chan error, 1)
go func() {
err := srv.Listen(srvCfg)
@ -482,7 +501,7 @@ func TestBindReconnect(t *testing.T) {
}
clientAlice := NewClient(ctx, serverURL, hmacTokenStore, "alice")
err := clientAlice.Connect()
err = clientAlice.Connect()
if err != nil {
t.Errorf("failed to connect to server: %s", err)
}
@ -547,7 +566,10 @@ func TestCloseConn(t *testing.T) {
ctx := context.Background()
srvCfg := server.ListenerConfig{Address: serverListenAddr}
srv := server.NewServer(serverURL, false, av)
srv, err := server.NewServer(otel.Meter(""), serverURL, false, av)
if err != nil {
t.Fatalf("failed to create server: %s", err)
}
errChan := make(chan error, 1)
go func() {
err := srv.Listen(srvCfg)
@ -570,7 +592,7 @@ func TestCloseConn(t *testing.T) {
}
clientAlice := NewClient(ctx, serverURL, hmacTokenStore, "alice")
err := clientAlice.Connect()
err = clientAlice.Connect()
if err != nil {
t.Errorf("failed to connect to server: %s", err)
}
@ -601,7 +623,10 @@ func TestCloseRelayConn(t *testing.T) {
ctx := context.Background()
srvCfg := server.ListenerConfig{Address: serverListenAddr}
srv := server.NewServer(serverURL, false, av)
srv, err := server.NewServer(otel.Meter(""), serverURL, false, av)
if err != nil {
t.Fatalf("failed to create server: %s", err)
}
errChan := make(chan error, 1)
go func() {
err := srv.Listen(srvCfg)
@ -623,7 +648,7 @@ func TestCloseRelayConn(t *testing.T) {
}
clientAlice := NewClient(ctx, serverURL, hmacTokenStore, "alice")
err := clientAlice.Connect()
err = clientAlice.Connect()
if err != nil {
t.Fatalf("failed to connect to server: %s", err)
}
@ -650,7 +675,10 @@ func TestCloseByServer(t *testing.T) {
ctx := context.Background()
srvCfg := server.ListenerConfig{Address: serverListenAddr}
srv1 := server.NewServer(serverURL, false, av)
srv1, err := server.NewServer(otel.Meter(""), serverURL, false, av)
if err != nil {
t.Fatalf("failed to create server: %s", err)
}
errChan := make(chan error, 1)
go func() {
@ -668,7 +696,7 @@ func TestCloseByServer(t *testing.T) {
idAlice := "alice"
log.Debugf("connect by alice")
relayClient := NewClient(ctx, serverURL, hmacTokenStore, idAlice)
err := relayClient.Connect()
err = relayClient.Connect()
if err != nil {
log.Fatalf("failed to connect to server: %s", err)
}
@ -700,7 +728,10 @@ func TestCloseByClient(t *testing.T) {
ctx := context.Background()
srvCfg := server.ListenerConfig{Address: serverListenAddr}
srv := server.NewServer(serverURL, false, av)
srv, err := server.NewServer(otel.Meter(""), serverURL, false, av)
if err != nil {
t.Fatalf("failed to create server: %s", err)
}
errChan := make(chan error, 1)
go func() {
err := srv.Listen(srvCfg)
@ -717,7 +748,7 @@ func TestCloseByClient(t *testing.T) {
idAlice := "alice"
log.Debugf("connect by alice")
relayClient := NewClient(ctx, serverURL, hmacTokenStore, idAlice)
err := relayClient.Connect()
err = relayClient.Connect()
if err != nil {
log.Fatalf("failed to connect to server: %s", err)
}

View File

@ -6,6 +6,7 @@ import (
"time"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"github.com/netbirdio/netbird/relay/server"
)
@ -16,7 +17,10 @@ func TestForeignConn(t *testing.T) {
srvCfg1 := server.ListenerConfig{
Address: "localhost:1234",
}
srv1 := server.NewServer(srvCfg1.Address, false, av)
srv1, err := server.NewServer(otel.Meter(""), srvCfg1.Address, false, av)
if err != nil {
t.Fatalf("failed to create server: %s", err)
}
errChan := make(chan error, 1)
go func() {
err := srv1.Listen(srvCfg1)
@ -39,7 +43,10 @@ func TestForeignConn(t *testing.T) {
srvCfg2 := server.ListenerConfig{
Address: "localhost:2234",
}
srv2 := server.NewServer(srvCfg2.Address, false, av)
srv2, err := server.NewServer(otel.Meter(""), srvCfg2.Address, false, av)
if err != nil {
t.Fatalf("failed to create server: %s", err)
}
errChan2 := make(chan error, 1)
go func() {
err := srv2.Listen(srvCfg2)
@ -64,7 +71,7 @@ func TestForeignConn(t *testing.T) {
mCtx, cancel := context.WithCancel(ctx)
defer cancel()
clientAlice := NewManager(mCtx, toURL(srvCfg1), idAlice)
err := clientAlice.Serve()
err = clientAlice.Serve()
if err != nil {
t.Fatalf("failed to serve manager: %s", err)
}
@ -122,7 +129,10 @@ func TestForeginConnClose(t *testing.T) {
srvCfg1 := server.ListenerConfig{
Address: "localhost:1234",
}
srv1 := server.NewServer(srvCfg1.Address, false, av)
srv1, err := server.NewServer(otel.Meter(""), srvCfg1.Address, false, av)
if err != nil {
t.Fatalf("failed to create server: %s", err)
}
errChan := make(chan error, 1)
go func() {
err := srv1.Listen(srvCfg1)
@ -145,7 +155,10 @@ func TestForeginConnClose(t *testing.T) {
srvCfg2 := server.ListenerConfig{
Address: "localhost:2234",
}
srv2 := server.NewServer(srvCfg2.Address, false, av)
srv2, err := server.NewServer(otel.Meter(""), srvCfg2.Address, false, av)
if err != nil {
t.Fatalf("failed to create server: %s", err)
}
errChan2 := make(chan error, 1)
go func() {
err := srv2.Listen(srvCfg2)
@ -170,7 +183,7 @@ func TestForeginConnClose(t *testing.T) {
mCtx, cancel := context.WithCancel(ctx)
defer cancel()
mgr := NewManager(mCtx, toURL(srvCfg1), idAlice)
err := mgr.Serve()
err = mgr.Serve()
if err != nil {
t.Fatalf("failed to serve manager: %s", err)
}
@ -191,7 +204,10 @@ func TestForeginAutoClose(t *testing.T) {
srvCfg1 := server.ListenerConfig{
Address: "localhost:1234",
}
srv1 := server.NewServer(srvCfg1.Address, false, av)
srv1, err := server.NewServer(otel.Meter(""), srvCfg1.Address, false, av)
if err != nil {
t.Fatalf("failed to create server: %s", err)
}
errChan := make(chan error, 1)
go func() {
t.Log("binding server 1.")
@ -217,7 +233,10 @@ func TestForeginAutoClose(t *testing.T) {
srvCfg2 := server.ListenerConfig{
Address: "localhost:2234",
}
srv2 := server.NewServer(srvCfg2.Address, false, av)
srv2, err := server.NewServer(otel.Meter(""), srvCfg2.Address, false, av)
if err != nil {
t.Fatalf("failed to create server: %s", err)
}
errChan2 := make(chan error, 1)
go func() {
t.Log("binding server 2.")
@ -244,7 +263,7 @@ func TestForeginAutoClose(t *testing.T) {
mCtx, cancel := context.WithCancel(ctx)
defer cancel()
mgr := NewManager(mCtx, toURL(srvCfg1), idAlice)
err := mgr.Serve()
err = mgr.Serve()
if err != nil {
t.Fatalf("failed to serve manager: %s", err)
}
@ -277,7 +296,10 @@ func TestAutoReconnect(t *testing.T) {
srvCfg := server.ListenerConfig{
Address: "localhost:1234",
}
srv := server.NewServer(srvCfg.Address, false, av)
srv, err := server.NewServer(otel.Meter(""), srvCfg.Address, false, av)
if err != nil {
t.Fatalf("failed to create server: %s", err)
}
errChan := make(chan error, 1)
go func() {
err := srv.Listen(srvCfg)
@ -300,7 +322,7 @@ func TestAutoReconnect(t *testing.T) {
mCtx, cancel := context.WithCancel(ctx)
defer cancel()
clientAlice := NewManager(mCtx, toURL(srvCfg), "alice")
err := clientAlice.Serve()
err = clientAlice.Serve()
if err != nil {
t.Fatalf("failed to serve manager: %s", err)
}

View File

@ -2,7 +2,9 @@ package main
import (
"crypto/tls"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
@ -14,9 +16,14 @@ import (
"github.com/netbirdio/netbird/encryption"
auth "github.com/netbirdio/netbird/relay/auth/hmac"
"github.com/netbirdio/netbird/relay/server"
"github.com/netbirdio/netbird/signal/metrics"
"github.com/netbirdio/netbird/util"
)
const (
metricsPort = 9090
)
type Config struct {
ListenAddress string
// in HA every peer connect to a common domain, the instance domain has been distributed during the p2p connection
@ -54,7 +61,7 @@ var (
Use: "relay",
Short: "Relay service",
Long: "Relay service for Netbird agents",
Run: execute,
RunE: execute,
}
)
@ -110,11 +117,10 @@ func loadConfig(configFile string) (*Config, error) {
return loadedConfig, err
}
func execute(cmd *cobra.Command, args []string) {
func execute(cmd *cobra.Command, args []string) error {
cfg, err := loadConfig(cfgFile)
if err != nil {
log.Errorf("failed to load config: %s", err)
os.Exit(1)
return fmt.Errorf("failed to load config: %s", err)
}
err = cfg.Validate()
@ -123,21 +129,31 @@ func execute(cmd *cobra.Command, args []string) {
os.Exit(1)
}
metricsServer, err := metrics.NewServer(metricsPort, "")
if err != nil {
return fmt.Errorf("setup metrics: %v", err)
}
go func() {
log.Infof("running metrics server: %s%s", metricsServer.Addr, metricsServer.Endpoint)
if err := metricsServer.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("Failed to start metrics server: %v", err)
}
}()
srvListenerCfg := server.ListenerConfig{
Address: cfg.ListenAddress,
}
if cfg.HasLetsEncrypt() {
tlsCfg, err := setupTLSCertManager(cfg.LetsencryptDataDir, cfg.LetsencryptDomains...)
if err != nil {
log.Errorf("%s", err)
os.Exit(1)
return fmt.Errorf("%s", err)
}
srvListenerCfg.TLSConfig = tlsCfg
} else if cfg.HasCertConfig() {
tlsCfg, err := encryption.LoadTLSConfig(cfg.TlsCertFile, cfg.TlsKeyFile)
if err != nil {
log.Errorf("%s", err)
os.Exit(1)
return fmt.Errorf("%s", err)
}
srvListenerCfg.TLSConfig = tlsCfg
}
@ -145,21 +161,23 @@ func execute(cmd *cobra.Command, args []string) {
tlsSupport := srvListenerCfg.TLSConfig != nil
authenticator := auth.NewTimedHMACValidator(cfg.AuthSecret, 24*time.Hour)
srv := server.NewServer(cfg.ExposedAddress, tlsSupport, authenticator)
srv, err := server.NewServer(metricsServer.Meter, cfg.ExposedAddress, tlsSupport, authenticator)
if err != nil {
return fmt.Errorf("failed to create relay server: %v", err)
}
log.Infof("server will be available on: %s", srv.InstanceURL())
err = srv.Listen(srvListenerCfg)
if err != nil {
log.Errorf("failed to bind server: %s", err)
os.Exit(1)
return fmt.Errorf("failed to bind server: %s", err)
}
waitForExitSignal()
err = srv.Close()
if err != nil {
log.Errorf("failed to close server: %s", err)
os.Exit(1)
return fmt.Errorf("failed to close server: %s", err)
}
return nil
}
func setupTLSCertManager(letsencryptDataDir string, letsencryptDomains ...string) (*tls.Config, error) {

21
relay/metrics/realy.go Normal file
View File

@ -0,0 +1,21 @@
package metrics
import "go.opentelemetry.io/otel/metric"
type Metrics struct {
metric.Meter
Peers metric.Int64UpDownCounter
}
func NewMetrics(meter metric.Meter) (*Metrics, error) {
peers, err := meter.Int64UpDownCounter("peers")
if err != nil {
return nil, err
}
return &Metrics{
Meter: meter,
Peers: peers,
}, nil
}

View File

@ -7,12 +7,15 @@ import (
"sync"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/metric"
"github.com/netbirdio/netbird/relay/auth"
"github.com/netbirdio/netbird/relay/messages"
"github.com/netbirdio/netbird/relay/metrics"
)
type Relay struct {
metrics *metrics.Metrics
validator auth.Validator
store *Store
@ -22,8 +25,14 @@ type Relay struct {
closeMu sync.RWMutex
}
func NewRelay(exposedAddress string, tlsSupport bool, validator auth.Validator) *Relay {
func NewRelay(meter metric.Meter, exposedAddress string, tlsSupport bool, validator auth.Validator) (*Relay, error) {
m, err := metrics.NewMetrics(meter)
if err != nil {
return nil, fmt.Errorf("creating app metrics: %v", err)
}
r := &Relay{
metrics: m,
validator: validator,
store: NewStore(),
}
@ -34,7 +43,7 @@ func NewRelay(exposedAddress string, tlsSupport bool, validator auth.Validator)
r.instaceURL = fmt.Sprintf("rel://%s", exposedAddress)
}
return r
return r, nil
}
func (r *Relay) Accept(conn net.Conn) {
@ -57,11 +66,12 @@ func (r *Relay) Accept(conn net.Conn) {
peer := NewPeer(peerID, conn, r.store)
peer.log.Infof("peer connected from: %s", conn.RemoteAddr())
r.store.AddPeer(peer)
r.metrics.Peers.Add(context.Background(), 1)
go func() {
peer.Work()
r.store.DeletePeer(peer)
peer.log.Debugf("relay connection closed")
r.metrics.Peers.Add(context.Background(), -1)
}()
}

View File

@ -8,6 +8,7 @@ import (
"time"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/metric"
"github.com/netbirdio/netbird/relay/auth"
"github.com/netbirdio/netbird/relay/server/listener"
@ -26,14 +27,14 @@ type Server struct {
wSListener listener.Listener
}
func NewServer(exposedAddress string, tlsSupport bool, authValidator auth.Validator) *Server {
return &Server{
relay: NewRelay(
exposedAddress,
tlsSupport,
authValidator,
),
func NewServer(meter metric.Meter, exposedAddress string, tlsSupport bool, authValidator auth.Validator) (*Server, error) {
relay, err := NewRelay(meter, exposedAddress, tlsSupport, authValidator)
if err != nil {
return nil, err
}
return &Server{
relay: relay,
}, nil
}
func (r *Server) Listen(cfg ListenerConfig) error {