mirror of
https://github.com/netbirdio/netbird.git
synced 2025-06-20 01:38:41 +02:00
[signal] Use signal dispatcher (#2373)
This commit is contained in:
parent
5ac6f56594
commit
92a0092ad5
1
go.mod
1
go.mod
@ -58,6 +58,7 @@ require (
|
|||||||
github.com/mitchellh/hashstructure/v2 v2.0.2
|
github.com/mitchellh/hashstructure/v2 v2.0.2
|
||||||
github.com/nadoo/ipset v0.5.0
|
github.com/nadoo/ipset v0.5.0
|
||||||
github.com/netbirdio/management-integrations/integrations v0.0.0-20240703085513-32605f7ffd8e
|
github.com/netbirdio/management-integrations/integrations v0.0.0-20240703085513-32605f7ffd8e
|
||||||
|
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20240820130728-bc0683599080
|
||||||
github.com/okta/okta-sdk-golang/v2 v2.18.0
|
github.com/okta/okta-sdk-golang/v2 v2.18.0
|
||||||
github.com/oschwald/maxminddb-golang v1.12.0
|
github.com/oschwald/maxminddb-golang v1.12.0
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||||
|
2
go.sum
2
go.sum
@ -477,6 +477,8 @@ github.com/netbirdio/management-integrations/integrations v0.0.0-20240703085513-
|
|||||||
github.com/netbirdio/management-integrations/integrations v0.0.0-20240703085513-32605f7ffd8e/go.mod h1:nykwWZnxb+sJz2Z//CEq45CMRWSHllH8pODKRB8eY7Y=
|
github.com/netbirdio/management-integrations/integrations v0.0.0-20240703085513-32605f7ffd8e/go.mod h1:nykwWZnxb+sJz2Z//CEq45CMRWSHllH8pODKRB8eY7Y=
|
||||||
github.com/netbirdio/service v0.0.0-20230215170314-b923b89432b0 h1:hirFRfx3grVA/9eEyjME5/z3nxdJlN9kfQpvWWPk32g=
|
github.com/netbirdio/service v0.0.0-20230215170314-b923b89432b0 h1:hirFRfx3grVA/9eEyjME5/z3nxdJlN9kfQpvWWPk32g=
|
||||||
github.com/netbirdio/service v0.0.0-20230215170314-b923b89432b0/go.mod h1:CIMRFEJVL+0DS1a3Nx06NaMn4Dz63Ng6O7dl0qH0zVM=
|
github.com/netbirdio/service v0.0.0-20230215170314-b923b89432b0/go.mod h1:CIMRFEJVL+0DS1a3Nx06NaMn4Dz63Ng6O7dl0qH0zVM=
|
||||||
|
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20240820130728-bc0683599080 h1:mXJkoWLdqJTlkQ7DgQ536kcXHXIdUPeagkN8i4eFDdg=
|
||||||
|
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20240820130728-bc0683599080/go.mod h1:5/sjFmLb8O96B5737VCqhHyGRzNFIaN/Bu7ZodXc3qQ=
|
||||||
github.com/netbirdio/wireguard-go v0.0.0-20240105182236-6c340dd55aed h1:t0UADZUJDaaZgfKrt8JUPrOLL9Mg/ryjP85RAH53qgs=
|
github.com/netbirdio/wireguard-go v0.0.0-20240105182236-6c340dd55aed h1:t0UADZUJDaaZgfKrt8JUPrOLL9Mg/ryjP85RAH53qgs=
|
||||||
github.com/netbirdio/wireguard-go v0.0.0-20240105182236-6c340dd55aed/go.mod h1:tkCQ4FQXmpAgYVh++1cq16/dH4QJtmvpRv19DWGAHSA=
|
github.com/netbirdio/wireguard-go v0.0.0-20240105182236-6c340dd55aed/go.mod h1:tkCQ4FQXmpAgYVh++1cq16/dH4QJtmvpRv19DWGAHSA=
|
||||||
github.com/nicksnyder/go-i18n/v2 v2.4.0 h1:3IcvPOAvnCKwNm0TB0dLDTuawWEj+ax/RERNC+diLMM=
|
github.com/nicksnyder/go-i18n/v2 v2.4.0 h1:3IcvPOAvnCKwNm0TB0dLDTuawWEj+ax/RERNC+diLMM=
|
||||||
|
@ -18,8 +18,11 @@ type Peer struct {
|
|||||||
|
|
||||||
StreamID int64
|
StreamID int64
|
||||||
|
|
||||||
//a gRpc connection stream to the Peer
|
// a gRpc connection stream to the Peer
|
||||||
Stream proto.SignalExchange_ConnectStreamServer
|
Stream proto.SignalExchange_ConnectStreamServer
|
||||||
|
|
||||||
|
// registration time
|
||||||
|
RegisteredAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPeer creates a new instance of a connected Peer
|
// NewPeer creates a new instance of a connected Peer
|
||||||
@ -28,6 +31,7 @@ func NewPeer(id string, stream proto.SignalExchange_ConnectStreamServer) *Peer {
|
|||||||
Id: id,
|
Id: id,
|
||||||
Stream: stream,
|
Stream: stream,
|
||||||
StreamID: time.Now().UnixNano(),
|
StreamID: time.Now().UnixNano(),
|
||||||
|
RegisteredAt: time.Now(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,6 +13,8 @@ import (
|
|||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
|
||||||
|
"github.com/netbirdio/signal-dispatcher/dispatcher"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/signal/metrics"
|
"github.com/netbirdio/netbird/signal/metrics"
|
||||||
"github.com/netbirdio/netbird/signal/peer"
|
"github.com/netbirdio/netbird/signal/peer"
|
||||||
"github.com/netbirdio/netbird/signal/proto"
|
"github.com/netbirdio/netbird/signal/proto"
|
||||||
@ -40,7 +42,7 @@ const (
|
|||||||
type Server struct {
|
type Server struct {
|
||||||
registry *peer.Registry
|
registry *peer.Registry
|
||||||
proto.UnimplementedSignalExchangeServer
|
proto.UnimplementedSignalExchangeServer
|
||||||
|
dispatcher *dispatcher.Dispatcher
|
||||||
metrics *metrics.AppMetrics
|
metrics *metrics.AppMetrics
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -51,7 +53,13 @@ func NewServer(meter metric.Meter) (*Server, error) {
|
|||||||
return nil, fmt.Errorf("creating app metrics: %v", err)
|
return nil, fmt.Errorf("creating app metrics: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dispatcher, err := dispatcher.NewDispatcher()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating dispatcher: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
s := &Server{
|
s := &Server{
|
||||||
|
dispatcher: dispatcher,
|
||||||
registry: peer.NewRegistry(appMetrics),
|
registry: peer.NewRegistry(appMetrics),
|
||||||
metrics: appMetrics,
|
metrics: appMetrics,
|
||||||
}
|
}
|
||||||
@ -61,57 +69,31 @@ func NewServer(meter metric.Meter) (*Server, error) {
|
|||||||
|
|
||||||
// Send forwards a message to the signal peer
|
// Send forwards a message to the signal peer
|
||||||
func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
|
func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
|
||||||
if !s.registry.IsPeerRegistered(msg.Key) {
|
log.Debugf("received a new message to send from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey)
|
||||||
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotRegistered)))
|
|
||||||
|
|
||||||
return nil, fmt.Errorf("peer %s is not registered", msg.Key)
|
if msg.RemoteKey == "dummy" {
|
||||||
}
|
// Test message send during netbird status
|
||||||
|
|
||||||
getRegistrationStart := time.Now()
|
|
||||||
|
|
||||||
if dstPeer, found := s.registry.Get(msg.RemoteKey); found {
|
|
||||||
s.metrics.GetRegistrationDelay.Record(ctx, float64(time.Since(getRegistrationStart).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeMessage), attribute.String(labelRegistrationStatus, labelRegistrationFound)))
|
|
||||||
start := time.Now()
|
|
||||||
//forward the message to the target peer
|
|
||||||
if err := dstPeer.Stream.Send(msg); err != nil {
|
|
||||||
log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", msg.Key, msg.RemoteKey, err)
|
|
||||||
//todo respond to the sender?
|
|
||||||
|
|
||||||
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeError)))
|
|
||||||
} else {
|
|
||||||
s.metrics.MessageForwardLatency.Record(ctx, float64(time.Since(start).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeMessage)))
|
|
||||||
s.metrics.MessagesForwarded.Add(context.Background(), 1)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
s.metrics.GetRegistrationDelay.Record(ctx, float64(time.Since(getRegistrationStart).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeMessage), attribute.String(labelRegistrationStatus, labelRegistrationNotFound)))
|
|
||||||
log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", msg.Key, msg.RemoteKey)
|
|
||||||
//todo respond to the sender?
|
|
||||||
|
|
||||||
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotConnected)))
|
|
||||||
}
|
|
||||||
return &proto.EncryptedMessage{}, nil
|
return &proto.EncryptedMessage{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, found := s.registry.Get(msg.RemoteKey); found {
|
||||||
|
s.forwardMessageToPeer(ctx, msg)
|
||||||
|
return &proto.EncryptedMessage{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.dispatcher.SendMessage(context.Background(), msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConnectStream connects to the exchange stream
|
// ConnectStream connects to the exchange stream
|
||||||
func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer) error {
|
func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer) error {
|
||||||
p, err := s.connectPeer(stream)
|
p, err := s.RegisterPeer(stream)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
startRegister := time.Now()
|
defer s.DeregisterPeer(p)
|
||||||
|
|
||||||
s.metrics.ActivePeers.Add(stream.Context(), 1)
|
// needed to confirm that the peer has been registered so that the client can proceed
|
||||||
|
|
||||||
defer func() {
|
|
||||||
log.Infof("peer disconnected [%s] [streamID %d] ", p.Id, p.StreamID)
|
|
||||||
s.registry.Deregister(p)
|
|
||||||
|
|
||||||
s.metrics.PeerConnectionDuration.Record(stream.Context(), int64(time.Since(startRegister).Seconds()))
|
|
||||||
s.metrics.ActivePeers.Add(context.Background(), -1)
|
|
||||||
}()
|
|
||||||
|
|
||||||
//needed to confirm that the peer has been registered so that the client can proceed
|
|
||||||
header := metadata.Pairs(proto.HeaderRegistered, "1")
|
header := metadata.Pairs(proto.HeaderRegistered, "1")
|
||||||
err = stream.SendHeader(header)
|
err = stream.SendHeader(header)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -119,11 +101,10 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("peer connected [%s] [streamID %d] ", p.Id, p.StreamID)
|
log.Debugf("peer connected [%s] [streamID %d] ", p.Id, p.StreamID)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
// read incoming messages
|
||||||
//read incoming messages
|
|
||||||
msg, err := stream.Recv()
|
msg, err := stream.Recv()
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
break
|
break
|
||||||
@ -131,44 +112,28 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("received a new message from peer [%s] to peer [%s]", p.Id, msg.RemoteKey)
|
log.Debugf("Received a response from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey)
|
||||||
|
|
||||||
getRegistrationStart := time.Now()
|
_, err = s.dispatcher.SendMessage(stream.Context(), msg)
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf("error while sending message from peer [%s] to peer [%s] %v", msg.Key, msg.RemoteKey, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// lookup the target peer where the message is going to
|
|
||||||
if dstPeer, found := s.registry.Get(msg.RemoteKey); found {
|
|
||||||
s.metrics.GetRegistrationDelay.Record(stream.Context(), float64(time.Since(getRegistrationStart).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream), attribute.String(labelRegistrationStatus, labelRegistrationFound)))
|
|
||||||
start := time.Now()
|
|
||||||
//forward the message to the target peer
|
|
||||||
if err := dstPeer.Stream.Send(msg); err != nil {
|
|
||||||
log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", p.Id, msg.RemoteKey, err)
|
|
||||||
//todo respond to the sender?
|
|
||||||
s.metrics.MessageForwardFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelType, labelTypeError)))
|
|
||||||
} else {
|
|
||||||
// in milliseconds
|
|
||||||
s.metrics.MessageForwardLatency.Record(stream.Context(), float64(time.Since(start).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream)))
|
|
||||||
s.metrics.MessagesForwarded.Add(stream.Context(), 1)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
s.metrics.GetRegistrationDelay.Record(stream.Context(), float64(time.Since(getRegistrationStart).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream), attribute.String(labelRegistrationStatus, labelRegistrationNotFound)))
|
|
||||||
s.metrics.MessageForwardFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotConnected)))
|
|
||||||
log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", p.Id, msg.RemoteKey)
|
|
||||||
//todo respond to the sender?
|
|
||||||
}
|
|
||||||
}
|
|
||||||
<-stream.Context().Done()
|
<-stream.Context().Done()
|
||||||
return stream.Context().Err()
|
return stream.Context().Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handles initial Peer connection.
|
func (s *Server) RegisterPeer(stream proto.SignalExchange_ConnectStreamServer) (*peer.Peer, error) {
|
||||||
// Each connection must provide an Id header.
|
log.Debugf("registering new peer")
|
||||||
// At this moment the connecting Peer will be registered in the peer.Registry
|
|
||||||
func (s Server) connectPeer(stream proto.SignalExchange_ConnectStreamServer) (*peer.Peer, error) {
|
|
||||||
if meta, hasMeta := metadata.FromIncomingContext(stream.Context()); hasMeta {
|
if meta, hasMeta := metadata.FromIncomingContext(stream.Context()); hasMeta {
|
||||||
if id, found := meta[proto.HeaderId]; found {
|
if id, found := meta[proto.HeaderId]; found {
|
||||||
p := peer.NewPeer(id[0], stream)
|
p := peer.NewPeer(id[0], stream)
|
||||||
|
|
||||||
s.registry.Register(p)
|
s.registry.Register(p)
|
||||||
|
s.dispatcher.ListenForMessages(stream.Context(), p.Id, s.forwardMessageToPeer)
|
||||||
|
|
||||||
|
s.metrics.ActivePeers.Add(stream.Context(), 1)
|
||||||
|
|
||||||
return p, nil
|
return p, nil
|
||||||
} else {
|
} else {
|
||||||
@ -180,3 +145,38 @@ func (s Server) connectPeer(stream proto.SignalExchange_ConnectStreamServer) (*p
|
|||||||
return nil, status.Errorf(codes.FailedPrecondition, "missing connection stream meta")
|
return nil, status.Errorf(codes.FailedPrecondition, "missing connection stream meta")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) DeregisterPeer(p *peer.Peer) {
|
||||||
|
log.Debugf("peer disconnected [%s] [streamID %d] ", p.Id, p.StreamID)
|
||||||
|
s.registry.Deregister(p)
|
||||||
|
|
||||||
|
s.metrics.PeerConnectionDuration.Record(p.Stream.Context(), int64(time.Since(p.RegisteredAt).Seconds()))
|
||||||
|
s.metrics.ActivePeers.Add(context.Background(), -1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedMessage) {
|
||||||
|
log.Debugf("forwarding a new message from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey)
|
||||||
|
|
||||||
|
getRegistrationStart := time.Now()
|
||||||
|
|
||||||
|
// lookup the target peer where the message is going to
|
||||||
|
if dstPeer, found := s.registry.Get(msg.RemoteKey); found {
|
||||||
|
s.metrics.GetRegistrationDelay.Record(ctx, float64(time.Since(getRegistrationStart).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream), attribute.String(labelRegistrationStatus, labelRegistrationFound)))
|
||||||
|
start := time.Now()
|
||||||
|
// forward the message to the target peer
|
||||||
|
if err := dstPeer.Stream.Send(msg); err != nil {
|
||||||
|
log.Warnf("error while forwarding message from peer [%s] to peer [%s] %v", msg.Key, msg.RemoteKey, err)
|
||||||
|
// todo respond to the sender?
|
||||||
|
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeError)))
|
||||||
|
} else {
|
||||||
|
// in milliseconds
|
||||||
|
s.metrics.MessageForwardLatency.Record(ctx, float64(time.Since(start).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream)))
|
||||||
|
s.metrics.MessagesForwarded.Add(ctx, 1)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
s.metrics.GetRegistrationDelay.Record(ctx, float64(time.Since(getRegistrationStart).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream), attribute.String(labelRegistrationStatus, labelRegistrationNotFound)))
|
||||||
|
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotConnected)))
|
||||||
|
log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", msg.Key, msg.RemoteKey)
|
||||||
|
// todo respond to the sender?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user