Feature/relay integration metrics (#2376)

Extend metrics

- TransferBytesSent
- Active/idle peers
- Connection times
This commit is contained in:
Zoltan Papp 2024-08-07 17:35:58 +02:00 committed by GitHub
parent 5400754954
commit 351db3dd49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 157 additions and 30 deletions

View File

@ -1,21 +1,136 @@
package metrics
import "go.opentelemetry.io/otel/metric"
import (
"context"
"sync"
"time"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/metric"
)
const (
idleTimeout = 30 * time.Second
)
type Metrics struct {
metric.Meter
Peers metric.Int64UpDownCounter
TransferBytesSent metric.Int64Counter
TransferBytesRecv metric.Int64Counter
peers metric.Int64UpDownCounter
peerActivityChan chan string
peerLastActive map[string]time.Time
mutexActivity sync.Mutex
ctx context.Context
}
func NewMetrics(meter metric.Meter) (*Metrics, error) {
peers, err := meter.Int64UpDownCounter("peers")
func NewMetrics(ctx context.Context, meter metric.Meter) (*Metrics, error) {
bytesSent, err := meter.Int64Counter("relay_transfer_sent_bytes_total")
if err != nil {
return nil, err
}
return &Metrics{
Meter: meter,
Peers: peers,
}, nil
bytesRecv, err := meter.Int64Counter("relay_transfer_received_bytes_total")
if err != nil {
return nil, err
}
peers, err := meter.Int64UpDownCounter("relay_peers")
if err != nil {
return nil, err
}
peersActive, err := meter.Int64ObservableGauge("relay_peers_active")
if err != nil {
return nil, err
}
peersIdle, err := meter.Int64ObservableGauge("relay_peers_idle")
if err != nil {
return nil, err
}
m := &Metrics{
Meter: meter,
TransferBytesSent: bytesSent,
TransferBytesRecv: bytesRecv,
peers: peers,
ctx: ctx,
peerActivityChan: make(chan string, 10),
peerLastActive: make(map[string]time.Time),
}
_, err = meter.RegisterCallback(
func(ctx context.Context, o metric.Observer) error {
active, idle := m.calculateActiveIdleConnections()
o.ObserveInt64(peersActive, active)
o.ObserveInt64(peersIdle, idle)
return nil
},
peersActive, peersIdle,
)
if err != nil {
return nil, err
}
go m.readPeerActivity()
return m, nil
}
// PeerConnected increments the number of connected peers and increments number of idle connections
func (m *Metrics) PeerConnected(id string) {
m.peers.Add(m.ctx, 1)
m.mutexActivity.Lock()
defer m.mutexActivity.Unlock()
m.peerLastActive[id] = time.Time{}
}
// PeerDisconnected decrements the number of connected peers and decrements number of idle or active connections
func (m *Metrics) PeerDisconnected(id string) {
m.peers.Add(m.ctx, -1)
m.mutexActivity.Lock()
defer m.mutexActivity.Unlock()
delete(m.peerLastActive, id)
}
// PeerActivity increases the active connections
func (m *Metrics) PeerActivity(peerID string) {
select {
case m.peerActivityChan <- peerID:
default:
log.Errorf("peer activity channel is full, dropping activity metrics for peer %s", peerID)
}
}
func (m *Metrics) calculateActiveIdleConnections() (int64, int64) {
active, idle := int64(0), int64(0)
m.mutexActivity.Lock()
defer m.mutexActivity.Unlock()
for _, lastActive := range m.peerLastActive {
if time.Since(lastActive) > idleTimeout {
idle++
} else {
active++
}
}
return active, idle
}
func (m *Metrics) readPeerActivity() {
for {
select {
case peerID := <-m.peerActivityChan:
m.mutexActivity.Lock()
m.peerLastActive[peerID] = time.Now()
m.mutexActivity.Unlock()
case <-m.ctx.Done():
return
}
}
}

View File

@ -12,6 +12,7 @@ import (
"github.com/netbirdio/netbird/relay/healthcheck"
"github.com/netbirdio/netbird/relay/messages"
"github.com/netbirdio/netbird/relay/metrics"
)
const (
@ -20,23 +21,25 @@ const (
// Peer represents a peer connection
type Peer struct {
log *log.Entry
idS string
idB []byte
conn net.Conn
connMu sync.RWMutex
store *Store
metrics *metrics.Metrics
log *log.Entry
idS string
idB []byte
conn net.Conn
connMu sync.RWMutex
store *Store
}
// NewPeer creates a new Peer instance and prepare custom logging
func NewPeer(id []byte, conn net.Conn, store *Store) *Peer {
func NewPeer(metrics *metrics.Metrics, id []byte, conn net.Conn, store *Store) *Peer {
stringID := messages.HashIDToString(id)
return &Peer{
log: log.WithField("peer_id", stringID),
idS: stringID,
idB: id,
conn: conn,
store: store,
metrics: metrics,
log: log.WithField("peer_id", stringID),
idS: stringID,
idB: id,
conn: conn,
store: store,
}
}
@ -70,6 +73,8 @@ func (p *Peer) Work() {
case messages.MsgTypeHealthCheck:
hc.OnHCResponse()
case messages.MsgTypeTransport:
p.metrics.TransferBytesRecv.Add(ctx, int64(n))
p.metrics.PeerActivity(p.String())
p.handleTransportMsg(msg)
case messages.MsgTypeClose:
p.log.Infof("peer exited gracefully")
@ -167,8 +172,10 @@ func (p *Peer) handleTransportMsg(msg []byte) {
p.log.Errorf("failed to update transport message: %s", err)
return
}
_, err = dp.Write(msg)
n, err := dp.Write(msg)
if err != nil {
p.log.Errorf("failed to write transport message to: %s", dp.String())
return
}
p.metrics.TransferBytesSent.Add(context.Background(), int64(n))
}

View File

@ -17,8 +17,9 @@ import (
// Relay represents the relay server
type Relay struct {
metrics *metrics.Metrics
validator auth.Validator
metrics *metrics.Metrics
metricsCancel context.CancelFunc
validator auth.Validator
store *Store
instanceURL string
@ -43,15 +44,18 @@ type Relay struct {
// A pointer to a Relay instance and an error. If the Relay instance is successfully created, the error is nil.
// Otherwise, the error contains the details of what went wrong.
func NewRelay(meter metric.Meter, exposedAddress string, tlsSupport bool, validator auth.Validator) (*Relay, error) {
m, err := metrics.NewMetrics(meter)
ctx, metricsCancel := context.WithCancel(context.Background())
m, err := metrics.NewMetrics(ctx, meter)
if err != nil {
metricsCancel()
return nil, fmt.Errorf("creating app metrics: %v", err)
}
r := &Relay{
metrics: m,
validator: validator,
store: NewStore(),
metrics: m,
metricsCancel: metricsCancel,
validator: validator,
store: NewStore(),
}
if tlsSupport {
@ -85,15 +89,15 @@ func (r *Relay) Accept(conn net.Conn) {
return
}
peer := NewPeer(peerID, conn, r.store)
peer := NewPeer(r.metrics, peerID, conn, r.store)
peer.log.Infof("peer connected from: %s", conn.RemoteAddr())
r.store.AddPeer(peer)
r.metrics.Peers.Add(context.Background(), 1)
r.metrics.PeerConnected(peer.String())
go func() {
peer.Work()
r.store.DeletePeer(peer)
peer.log.Debugf("relay connection closed")
r.metrics.Peers.Add(context.Background(), -1)
r.metrics.PeerDisconnected(peer.String())
}()
}
@ -112,6 +116,7 @@ func (r *Relay) Close(ctx context.Context) {
}(peer)
}
wg.Wait()
r.metricsCancel()
r.closeMu.Unlock()
}