fix forwarded metrics (#2273)

This commit is contained in:
Maycon Santos 2024-07-16 10:14:30 +02:00 committed by GitHub
parent 1537b0f5e7
commit 88d1c5a0fd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -23,6 +23,8 @@ const (
labelTypeError = "error" labelTypeError = "error"
labelTypeNotConnected = "not_connected" labelTypeNotConnected = "not_connected"
labelTypeNotRegistered = "not_registered" labelTypeNotRegistered = "not_registered"
labelTypeStream = "stream"
labelTypeMessage = "message"
labelError = "error" labelError = "error"
labelErrorMissingId = "missing_id" labelErrorMissingId = "missing_id"
@ -62,6 +64,7 @@ func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto.
} }
if dstPeer, found := s.registry.Get(msg.RemoteKey); found { if dstPeer, found := s.registry.Get(msg.RemoteKey); found {
start := time.Now()
//forward the message to the target peer //forward the message to the target peer
if err := dstPeer.Stream.Send(msg); err != nil { if err := dstPeer.Stream.Send(msg); err != nil {
log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", msg.Key, msg.RemoteKey, err) log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", msg.Key, msg.RemoteKey, err)
@ -69,6 +72,7 @@ func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto.
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeError))) s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeError)))
} else { } else {
s.metrics.MessageForwardLatency.Record(ctx, float64(time.Since(start).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeMessage)))
s.metrics.MessagesForwarded.Add(context.Background(), 1) s.metrics.MessagesForwarded.Add(context.Background(), 1)
} }
} else { } else {
@ -118,22 +122,21 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
} else if err != nil { } else if err != nil {
return err return err
} }
start := time.Now()
log.Debugf("received a new message from peer [%s] to peer [%s]", p.Id, msg.RemoteKey) log.Debugf("received a new message from peer [%s] to peer [%s]", p.Id, msg.RemoteKey)
// lookup the target peer where the message is going to // lookup the target peer where the message is going to
if dstPeer, found := s.registry.Get(msg.RemoteKey); found { if dstPeer, found := s.registry.Get(msg.RemoteKey); found {
start := time.Now()
//forward the message to the target peer //forward the message to the target peer
if err := dstPeer.Stream.Send(msg); err != nil { if err := dstPeer.Stream.Send(msg); err != nil {
log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", p.Id, msg.RemoteKey, err) log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", p.Id, msg.RemoteKey, err)
//todo respond to the sender? //todo respond to the sender?
// in milliseconds
s.metrics.MessageForwardLatency.Record(stream.Context(), float64(time.Since(start).Nanoseconds())/1e6)
s.metrics.MessagesForwarded.Add(stream.Context(), 1)
} else {
s.metrics.MessageForwardFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelType, labelTypeError))) s.metrics.MessageForwardFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelType, labelTypeError)))
} else {
// in milliseconds
s.metrics.MessageForwardLatency.Record(stream.Context(), float64(time.Since(start).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream)))
s.metrics.MessagesForwarded.Add(stream.Context(), 1)
} }
} else { } else {
log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", p.Id, msg.RemoteKey) log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", p.Id, msg.RemoteKey)