netbird/relay/metrics/realy.go

177 lines
4.0 KiB
Go
Raw Permalink Normal View History

package metrics
import (
"context"
"sync"
"time"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/metric"
)
const (
idleTimeout = 30 * time.Second
)
type Metrics struct {
metric.Meter
TransferBytesSent metric.Int64Counter
TransferBytesRecv metric.Int64Counter
AuthenticationTime metric.Float64Histogram
PeerStoreTime metric.Float64Histogram
peers metric.Int64UpDownCounter
peerActivityChan chan string
peerLastActive map[string]time.Time
mutexActivity sync.Mutex
ctx context.Context
}
func NewMetrics(ctx context.Context, meter metric.Meter) (*Metrics, error) {
bytesSent, err := meter.Int64Counter("relay_transfer_sent_bytes_total")
if err != nil {
return nil, err
}
bytesRecv, err := meter.Int64Counter("relay_transfer_received_bytes_total")
if err != nil {
return nil, err
}
peers, err := meter.Int64UpDownCounter("relay_peers")
if err != nil {
return nil, err
}
peersActive, err := meter.Int64ObservableGauge("relay_peers_active")
if err != nil {
return nil, err
}
peersIdle, err := meter.Int64ObservableGauge("relay_peers_idle")
if err != nil {
return nil, err
}
authTime, err := meter.Float64Histogram("relay_peer_authentication_time_milliseconds", metric.WithExplicitBucketBoundaries(getStandardBucketBoundaries()...))
if err != nil {
return nil, err
}
peerStoreTime, err := meter.Float64Histogram("relay_peer_store_time_milliseconds", metric.WithExplicitBucketBoundaries(getStandardBucketBoundaries()...))
if err != nil {
return nil, err
}
m := &Metrics{
Meter: meter,
TransferBytesSent: bytesSent,
TransferBytesRecv: bytesRecv,
AuthenticationTime: authTime,
PeerStoreTime: peerStoreTime,
peers: peers,
ctx: ctx,
peerActivityChan: make(chan string, 10),
peerLastActive: make(map[string]time.Time),
}
_, err = meter.RegisterCallback(
func(ctx context.Context, o metric.Observer) error {
active, idle := m.calculateActiveIdleConnections()
o.ObserveInt64(peersActive, active)
o.ObserveInt64(peersIdle, idle)
return nil
},
peersActive, peersIdle,
)
if err != nil {
return nil, err
}
go m.readPeerActivity()
return m, nil
}
// PeerConnected increments the number of connected peers and increments number of idle connections
func (m *Metrics) PeerConnected(id string) {
m.peers.Add(m.ctx, 1)
m.mutexActivity.Lock()
defer m.mutexActivity.Unlock()
m.peerLastActive[id] = time.Time{}
}
// RecordAuthenticationTime measures the time taken for peer authentication
func (m *Metrics) RecordAuthenticationTime(duration time.Duration) {
m.AuthenticationTime.Record(m.ctx, float64(duration.Nanoseconds())/1e6)
}
// RecordPeerStoreTime measures the time to store the peer in map
func (m *Metrics) RecordPeerStoreTime(duration time.Duration) {
m.PeerStoreTime.Record(m.ctx, float64(duration.Nanoseconds())/1e6)
}
// PeerDisconnected decrements the number of connected peers and decrements number of idle or active connections
func (m *Metrics) PeerDisconnected(id string) {
m.peers.Add(m.ctx, -1)
m.mutexActivity.Lock()
defer m.mutexActivity.Unlock()
delete(m.peerLastActive, id)
}
// PeerActivity increases the active connections
func (m *Metrics) PeerActivity(peerID string) {
select {
case m.peerActivityChan <- peerID:
default:
2024-09-11 18:36:19 +02:00
log.Tracef("peer activity channel is full, dropping activity metrics for peer %s", peerID)
}
}
func (m *Metrics) calculateActiveIdleConnections() (int64, int64) {
active, idle := int64(0), int64(0)
m.mutexActivity.Lock()
defer m.mutexActivity.Unlock()
for _, lastActive := range m.peerLastActive {
if time.Since(lastActive) > idleTimeout {
idle++
} else {
active++
}
}
return active, idle
}
func (m *Metrics) readPeerActivity() {
for {
select {
case peerID := <-m.peerActivityChan:
m.mutexActivity.Lock()
m.peerLastActive[peerID] = time.Now()
m.mutexActivity.Unlock()
case <-m.ctx.Done():
return
}
}
}
func getStandardBucketBoundaries() []float64 {
return []float64{
0.1,
0.5,
1,
5,
10,
50,
100,
500,
1000,
5000,
10000,
}
}