[signal] remove stream receive server side (#3820)

This commit is contained in:
Pascal Fischer
2025-05-14 19:28:51 +02:00
committed by GitHub
parent 92c91bbdd8
commit e520b64c6d
3 changed files with 17 additions and 31 deletions

View File

@ -3,7 +3,6 @@ package server
import (
"context"
"fmt"
"io"
"time"
log "github.com/sirupsen/logrus"
@ -29,10 +28,11 @@ const (
labelTypeStream = "stream"
labelTypeMessage = "message"
labelError = "error"
labelErrorMissingId = "missing_id"
labelErrorMissingMeta = "missing_meta"
labelErrorFailedHeader = "failed_header"
labelError = "error"
labelErrorMissingId = "missing_id"
labelErrorMissingMeta = "missing_meta"
labelErrorFailedHeader = "failed_header"
labelErrorFailedRegistration = "failed_registration"
labelRegistrationStatus = "status"
labelRegistrationFound = "found"
@ -99,28 +99,9 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
log.Debugf("peer connected [%s] [streamID %d] ", p.Id, p.StreamID)
for {
select {
case <-stream.Context().Done():
log.Debugf("stream closed for peer [%s] [streamID %d] due to context cancellation", p.Id, p.StreamID)
return stream.Context().Err()
default:
// read incoming messages
msg, err := stream.Recv()
if err == io.EOF {
break
} else if err != nil {
return err
}
log.Tracef("Received a response from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey)
_, err = s.dispatcher.SendMessage(stream.Context(), msg)
if err != nil {
log.Debugf("error while sending message from peer [%s] to peer [%s] %v", msg.Key, msg.RemoteKey, err)
}
}
}
<-stream.Context().Done()
log.Debugf("peer stream closing [%s] [streamID %d] ", p.Id, p.StreamID)
return nil
}
func (s *Server) RegisterPeer(stream proto.SignalExchange_ConnectStreamServer) (*peer.Peer, error) {
@ -139,7 +120,12 @@ func (s *Server) RegisterPeer(stream proto.SignalExchange_ConnectStreamServer) (
p := peer.NewPeer(id[0], stream)
s.registry.Register(p)
s.dispatcher.ListenForMessages(stream.Context(), p.Id, s.forwardMessageToPeer)
err := s.dispatcher.ListenForMessages(stream.Context(), p.Id, s.forwardMessageToPeer)
if err != nil {
s.metrics.RegistrationFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelError, labelErrorFailedRegistration)))
log.Errorf("error while registering message listener for peer [%s] %v", p.Id, err)
return nil, status.Errorf(codes.Internal, "error while registering message listener")
}
return p, nil
}