[client] Fix/ice handshake (#4281)

In this PR, speed up the GRPC message processing, force the recreation of the ICE agent when getting a new, remote offer (do not wait for local STUN timeout).
This commit is contained in:
Zoltan Papp
2025-08-18 20:09:50 +02:00
committed by GitHub
parent 6a3846a8b7
commit 12cad854b2
13 changed files with 563 additions and 357 deletions

View File

@@ -55,11 +55,11 @@ import (
nbssh "github.com/netbirdio/netbird/client/ssh" nbssh "github.com/netbirdio/netbird/client/ssh"
"github.com/netbirdio/netbird/client/system" "github.com/netbirdio/netbird/client/system"
nbdns "github.com/netbirdio/netbird/dns" nbdns "github.com/netbirdio/netbird/dns"
"github.com/netbirdio/netbird/route"
mgm "github.com/netbirdio/netbird/shared/management/client" mgm "github.com/netbirdio/netbird/shared/management/client"
mgmProto "github.com/netbirdio/netbird/shared/management/proto" mgmProto "github.com/netbirdio/netbird/shared/management/proto"
auth "github.com/netbirdio/netbird/shared/relay/auth/hmac" auth "github.com/netbirdio/netbird/shared/relay/auth/hmac"
relayClient "github.com/netbirdio/netbird/shared/relay/client" relayClient "github.com/netbirdio/netbird/shared/relay/client"
"github.com/netbirdio/netbird/route"
signal "github.com/netbirdio/netbird/shared/signal/client" signal "github.com/netbirdio/netbird/shared/signal/client"
sProto "github.com/netbirdio/netbird/shared/signal/proto" sProto "github.com/netbirdio/netbird/shared/signal/proto"
"github.com/netbirdio/netbird/util" "github.com/netbirdio/netbird/util"
@@ -254,6 +254,7 @@ func NewEngine(
} }
engine.stateManager = statemanager.New(path) engine.stateManager = statemanager.New(path)
log.Infof("I am: %s", config.WgPrivateKey.PublicKey().String())
return engine return engine
} }
@@ -1330,52 +1331,17 @@ func (e *Engine) receiveSignalEvents() {
} }
switch msg.GetBody().Type { switch msg.GetBody().Type {
case sProto.Body_OFFER: case sProto.Body_OFFER, sProto.Body_ANSWER:
remoteCred, err := signal.UnMarshalCredential(msg) offerAnswer, err := convertToOfferAnswer(msg)
if err != nil { if err != nil {
return err return err
} }
var rosenpassPubKey []byte if msg.Body.Type == sProto.Body_OFFER {
rosenpassAddr := "" conn.OnRemoteOffer(*offerAnswer)
if msg.GetBody().GetRosenpassConfig() != nil { } else {
rosenpassPubKey = msg.GetBody().GetRosenpassConfig().GetRosenpassPubKey() conn.OnRemoteAnswer(*offerAnswer)
rosenpassAddr = msg.GetBody().GetRosenpassConfig().GetRosenpassServerAddr()
} }
conn.OnRemoteOffer(peer.OfferAnswer{
IceCredentials: peer.IceCredentials{
UFrag: remoteCred.UFrag,
Pwd: remoteCred.Pwd,
},
WgListenPort: int(msg.GetBody().GetWgListenPort()),
Version: msg.GetBody().GetNetBirdVersion(),
RosenpassPubKey: rosenpassPubKey,
RosenpassAddr: rosenpassAddr,
RelaySrvAddress: msg.GetBody().GetRelayServerAddress(),
})
case sProto.Body_ANSWER:
remoteCred, err := signal.UnMarshalCredential(msg)
if err != nil {
return err
}
var rosenpassPubKey []byte
rosenpassAddr := ""
if msg.GetBody().GetRosenpassConfig() != nil {
rosenpassPubKey = msg.GetBody().GetRosenpassConfig().GetRosenpassPubKey()
rosenpassAddr = msg.GetBody().GetRosenpassConfig().GetRosenpassServerAddr()
}
conn.OnRemoteAnswer(peer.OfferAnswer{
IceCredentials: peer.IceCredentials{
UFrag: remoteCred.UFrag,
Pwd: remoteCred.Pwd,
},
WgListenPort: int(msg.GetBody().GetWgListenPort()),
Version: msg.GetBody().GetNetBirdVersion(),
RosenpassPubKey: rosenpassPubKey,
RosenpassAddr: rosenpassAddr,
RelaySrvAddress: msg.GetBody().GetRelayServerAddress(),
})
case sProto.Body_CANDIDATE: case sProto.Body_CANDIDATE:
candidate, err := ice.UnmarshalCandidate(msg.GetBody().Payload) candidate, err := ice.UnmarshalCandidate(msg.GetBody().Payload)
if err != nil { if err != nil {
@@ -2073,3 +2039,44 @@ func createFile(path string) error {
} }
return file.Close() return file.Close()
} }
func convertToOfferAnswer(msg *sProto.Message) (*peer.OfferAnswer, error) {
remoteCred, err := signal.UnMarshalCredential(msg)
if err != nil {
return nil, err
}
var (
rosenpassPubKey []byte
rosenpassAddr string
)
if cfg := msg.GetBody().GetRosenpassConfig(); cfg != nil {
rosenpassPubKey = cfg.GetRosenpassPubKey()
rosenpassAddr = cfg.GetRosenpassServerAddr()
}
// Handle optional SessionID
var sessionID *peer.ICESessionID
if sessionBytes := msg.GetBody().GetSessionId(); sessionBytes != nil {
if id, err := peer.ICESessionIDFromBytes(sessionBytes); err != nil {
log.Warnf("Invalid session ID in message: %v", err)
sessionID = nil // Set to nil if conversion fails
} else {
sessionID = &id
}
}
offerAnswer := peer.OfferAnswer{
IceCredentials: peer.IceCredentials{
UFrag: remoteCred.UFrag,
Pwd: remoteCred.Pwd,
},
WgListenPort: int(msg.GetBody().GetWgListenPort()),
Version: msg.GetBody().GetNetBirdVersion(),
RosenpassPubKey: rosenpassPubKey,
RosenpassAddr: rosenpassAddr,
RelaySrvAddress: msg.GetBody().GetRelayServerAddress(),
SessionID: sessionID,
}
return &offerAnswer, nil
}

View File

@@ -24,8 +24,8 @@ import (
"github.com/netbirdio/netbird/client/internal/peer/id" "github.com/netbirdio/netbird/client/internal/peer/id"
"github.com/netbirdio/netbird/client/internal/peer/worker" "github.com/netbirdio/netbird/client/internal/peer/worker"
"github.com/netbirdio/netbird/client/internal/stdnet" "github.com/netbirdio/netbird/client/internal/stdnet"
relayClient "github.com/netbirdio/netbird/shared/relay/client"
"github.com/netbirdio/netbird/route" "github.com/netbirdio/netbird/route"
relayClient "github.com/netbirdio/netbird/shared/relay/client"
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group" semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
) )
@@ -200,19 +200,11 @@ func (conn *Conn) Open(engineCtx context.Context) error {
conn.wg.Add(1) conn.wg.Add(1)
go func() { go func() {
defer conn.wg.Done() defer conn.wg.Done()
conn.waitInitialRandomSleepTime(conn.ctx) conn.waitInitialRandomSleepTime(conn.ctx)
conn.semaphore.Done(conn.ctx) conn.semaphore.Done(conn.ctx)
conn.dumpState.SendOffer()
if err := conn.handshaker.sendOffer(); err != nil {
conn.Log.Errorf("failed to send initial offer: %v", err)
}
conn.wg.Add(1)
go func() {
conn.guard.Start(conn.ctx, conn.onGuardEvent) conn.guard.Start(conn.ctx, conn.onGuardEvent)
conn.wg.Done()
}()
}() }()
conn.opened = true conn.opened = true
return nil return nil
@@ -274,10 +266,10 @@ func (conn *Conn) Close(signalToRemote bool) {
// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise // OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
// doesn't block, discards the message if connection wasn't ready // doesn't block, discards the message if connection wasn't ready
func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) bool { func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) {
conn.dumpState.RemoteAnswer() conn.dumpState.RemoteAnswer()
conn.Log.Infof("OnRemoteAnswer, priority: %s, status ICE: %s, status relay: %s", conn.currentConnPriority, conn.statusICE, conn.statusRelay) conn.Log.Infof("OnRemoteAnswer, priority: %s, status ICE: %s, status relay: %s", conn.currentConnPriority, conn.statusICE, conn.statusRelay)
return conn.handshaker.OnRemoteAnswer(answer) conn.handshaker.OnRemoteAnswer(answer)
} }
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer. // OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
@@ -296,10 +288,10 @@ func (conn *Conn) SetOnDisconnected(handler func(remotePeer string)) {
conn.onDisconnected = handler conn.onDisconnected = handler
} }
func (conn *Conn) OnRemoteOffer(offer OfferAnswer) bool { func (conn *Conn) OnRemoteOffer(offer OfferAnswer) {
conn.dumpState.RemoteOffer() conn.dumpState.RemoteOffer()
conn.Log.Infof("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay) conn.Log.Infof("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay)
return conn.handshaker.OnRemoteOffer(offer) conn.handshaker.OnRemoteOffer(offer)
} }
// WgConfig returns the WireGuard config // WgConfig returns the WireGuard config
@@ -548,7 +540,6 @@ func (conn *Conn) onRelayDisconnected() {
} }
func (conn *Conn) onGuardEvent() { func (conn *Conn) onGuardEvent() {
conn.Log.Debugf("send offer to peer")
conn.dumpState.SendOffer() conn.dumpState.SendOffer()
if err := conn.handshaker.SendOffer(); err != nil { if err := conn.handshaker.SendOffer(); err != nil {
conn.Log.Errorf("failed to send offer: %v", err) conn.Log.Errorf("failed to send offer: %v", err)
@@ -672,7 +663,7 @@ func (conn *Conn) isConnectedOnAllWay() (connected bool) {
} }
}() }()
if conn.statusICE.Get() == worker.StatusDisconnected { if conn.statusICE.Get() == worker.StatusDisconnected && !conn.workerICE.InProgress() {
return false return false
} }

View File

@@ -1,9 +1,9 @@
package peer package peer
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"sync"
"testing" "testing"
"time" "time"
@@ -79,16 +79,13 @@ func TestConn_OnRemoteOffer(t *testing.T) {
return return
} }
wg := sync.WaitGroup{} onNewOffeChan := make(chan struct{})
wg.Add(2)
go func() {
<-conn.handshaker.remoteOffersCh
wg.Done()
}()
go func() { conn.handshaker.AddOnNewOfferListener(func(remoteOfferAnswer *OfferAnswer) {
for { onNewOffeChan <- struct{}{}
accepted := conn.OnRemoteOffer(OfferAnswer{ })
conn.OnRemoteOffer(OfferAnswer{
IceCredentials: IceCredentials{ IceCredentials: IceCredentials{
UFrag: "test", UFrag: "test",
Pwd: "test", Pwd: "test",
@@ -96,14 +93,16 @@ func TestConn_OnRemoteOffer(t *testing.T) {
WgListenPort: 0, WgListenPort: 0,
Version: "", Version: "",
}) })
if accepted {
wg.Done()
return
}
}
}()
wg.Wait() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
select {
case <-onNewOffeChan:
// success
case <-ctx.Done():
t.Error("expected to receive a new offer notification, but timed out")
}
} }
func TestConn_OnRemoteAnswer(t *testing.T) { func TestConn_OnRemoteAnswer(t *testing.T) {
@@ -119,16 +118,13 @@ func TestConn_OnRemoteAnswer(t *testing.T) {
return return
} }
wg := sync.WaitGroup{} onNewOffeChan := make(chan struct{})
wg.Add(2)
go func() {
<-conn.handshaker.remoteAnswerCh
wg.Done()
}()
go func() { conn.handshaker.AddOnNewOfferListener(func(remoteOfferAnswer *OfferAnswer) {
for { onNewOffeChan <- struct{}{}
accepted := conn.OnRemoteAnswer(OfferAnswer{ })
conn.OnRemoteAnswer(OfferAnswer{
IceCredentials: IceCredentials{ IceCredentials: IceCredentials{
UFrag: "test", UFrag: "test",
Pwd: "test", Pwd: "test",
@@ -136,14 +132,15 @@ func TestConn_OnRemoteAnswer(t *testing.T) {
WgListenPort: 0, WgListenPort: 0,
Version: "", Version: "",
}) })
if accepted { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
wg.Done() defer cancel()
return
}
}
}()
wg.Wait() select {
case <-onNewOffeChan:
// success
case <-ctx.Done():
t.Error("expected to receive a new offer notification, but timed out")
}
} }
func TestConn_presharedKey(t *testing.T) { func TestConn_presharedKey(t *testing.T) {

View File

@@ -19,7 +19,6 @@ type isConnectedFunc func() bool
// - Relayed connection disconnected // - Relayed connection disconnected
// - ICE candidate changes // - ICE candidate changes
type Guard struct { type Guard struct {
Reconnect chan struct{}
log *log.Entry log *log.Entry
isConnectedOnAllWay isConnectedFunc isConnectedOnAllWay isConnectedFunc
timeout time.Duration timeout time.Duration
@@ -30,7 +29,6 @@ type Guard struct {
func NewGuard(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Duration, srWatcher *SRWatcher) *Guard { func NewGuard(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Duration, srWatcher *SRWatcher) *Guard {
return &Guard{ return &Guard{
Reconnect: make(chan struct{}, 1),
log: log, log: log,
isConnectedOnAllWay: isConnectedFn, isConnectedOnAllWay: isConnectedFn,
timeout: timeout, timeout: timeout,
@@ -41,6 +39,7 @@ func NewGuard(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Durati
} }
func (g *Guard) Start(ctx context.Context, eventCallback func()) { func (g *Guard) Start(ctx context.Context, eventCallback func()) {
g.log.Infof("starting guard for reconnection with MaxInterval: %s", g.timeout)
g.reconnectLoopWithRetry(ctx, eventCallback) g.reconnectLoopWithRetry(ctx, eventCallback)
} }
@@ -61,17 +60,14 @@ func (g *Guard) SetICEConnDisconnected() {
// reconnectLoopWithRetry periodically check the connection status. // reconnectLoopWithRetry periodically check the connection status.
// Try to send offer while the P2P is not established or while the Relay is not connected if is it supported // Try to send offer while the P2P is not established or while the Relay is not connected if is it supported
func (g *Guard) reconnectLoopWithRetry(ctx context.Context, callback func()) { func (g *Guard) reconnectLoopWithRetry(ctx context.Context, callback func()) {
waitForInitialConnectionTry(ctx)
srReconnectedChan := g.srWatcher.NewListener() srReconnectedChan := g.srWatcher.NewListener()
defer g.srWatcher.RemoveListener(srReconnectedChan) defer g.srWatcher.RemoveListener(srReconnectedChan)
ticker := g.prepareExponentTicker(ctx) ticker := g.initialTicker(ctx)
defer ticker.Stop() defer ticker.Stop()
tickerChannel := ticker.C tickerChannel := ticker.C
g.log.Infof("start reconnect loop...")
for { for {
select { select {
case t := <-tickerChannel: case t := <-tickerChannel:
@@ -85,7 +81,6 @@ func (g *Guard) reconnectLoopWithRetry(ctx context.Context, callback func()) {
if !g.isConnectedOnAllWay() { if !g.isConnectedOnAllWay() {
callback() callback()
} }
case <-g.relayedConnDisconnected: case <-g.relayedConnDisconnected:
g.log.Debugf("Relay connection changed, reset reconnection ticker") g.log.Debugf("Relay connection changed, reset reconnection ticker")
ticker.Stop() ticker.Stop()
@@ -111,6 +106,20 @@ func (g *Guard) reconnectLoopWithRetry(ctx context.Context, callback func()) {
} }
} }
// initialTicker give chance to the peer to establish the initial connection.
func (g *Guard) initialTicker(ctx context.Context) *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 3 * time.Second,
RandomizationFactor: 0.1,
Multiplier: 2,
MaxInterval: g.timeout,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
return backoff.NewTicker(bo)
}
func (g *Guard) prepareExponentTicker(ctx context.Context) *backoff.Ticker { func (g *Guard) prepareExponentTicker(ctx context.Context) *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{ bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond, InitialInterval: 800 * time.Millisecond,
@@ -126,13 +135,3 @@ func (g *Guard) prepareExponentTicker(ctx context.Context) *backoff.Ticker {
return ticker return ticker
} }
// Give chance to the peer to establish the initial connection.
// With it, we can decrease to send necessary offer
func waitForInitialConnectionTry(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-time.After(3 * time.Second):
}
}

View File

@@ -39,6 +39,15 @@ type OfferAnswer struct {
// relay server address // relay server address
RelaySrvAddress string RelaySrvAddress string
// SessionID is the unique identifier of the session, used to discard old messages
SessionID *ICESessionID
}
func (oa *OfferAnswer) SessionIDString() string {
if oa.SessionID == nil {
return "unknown"
}
return oa.SessionID.String()
} }
type Handshaker struct { type Handshaker struct {
@@ -74,21 +83,25 @@ func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAn
func (h *Handshaker) Listen(ctx context.Context) { func (h *Handshaker) Listen(ctx context.Context) {
for { for {
h.log.Info("wait for remote offer confirmation") select {
remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation(ctx) case remoteOfferAnswer := <-h.remoteOffersCh:
if err != nil { // received confirmation from the remote peer -> ready to proceed
var connectionClosedError *ConnectionClosedError if err := h.sendAnswer(); err != nil {
if errors.As(err, &connectionClosedError) { h.log.Errorf("failed to send remote offer confirmation: %s", err)
h.log.Info("exit from handshaker")
return
}
h.log.Errorf("failed to received remote offer confirmation: %s", err)
continue continue
} }
h.log.Infof("received connection confirmation, running version %s and with remote WireGuard listen port %d", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort)
for _, listener := range h.onNewOfferListeners { for _, listener := range h.onNewOfferListeners {
go listener(remoteOfferAnswer) listener(&remoteOfferAnswer)
}
h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
case remoteOfferAnswer := <-h.remoteAnswerCh:
h.log.Infof("received answer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
for _, listener := range h.onNewOfferListeners {
listener(&remoteOfferAnswer)
}
case <-ctx.Done():
h.log.Infof("stop listening for remote offers and answers")
return
} }
} }
} }
@@ -101,43 +114,27 @@ func (h *Handshaker) SendOffer() error {
// OnRemoteOffer handles an offer from the remote peer and returns true if the message was accepted, false otherwise // OnRemoteOffer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
// doesn't block, discards the message if connection wasn't ready // doesn't block, discards the message if connection wasn't ready
func (h *Handshaker) OnRemoteOffer(offer OfferAnswer) bool { func (h *Handshaker) OnRemoteOffer(offer OfferAnswer) {
select { select {
case h.remoteOffersCh <- offer: case h.remoteOffersCh <- offer:
return true return
default: default:
h.log.Warnf("OnRemoteOffer skipping message because is not ready") h.log.Warnf("skipping remote offer message because receiver not ready")
// connection might not be ready yet to receive so we ignore the message // connection might not be ready yet to receive so we ignore the message
return false return
} }
} }
// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise // OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
// doesn't block, discards the message if connection wasn't ready // doesn't block, discards the message if connection wasn't ready
func (h *Handshaker) OnRemoteAnswer(answer OfferAnswer) bool { func (h *Handshaker) OnRemoteAnswer(answer OfferAnswer) {
select { select {
case h.remoteAnswerCh <- answer: case h.remoteAnswerCh <- answer:
return true return
default: default:
// connection might not be ready yet to receive so we ignore the message // connection might not be ready yet to receive so we ignore the message
h.log.Debugf("OnRemoteAnswer skipping message because is not ready") h.log.Warnf("skipping remote answer message because receiver not ready")
return false return
}
}
func (h *Handshaker) waitForRemoteOfferConfirmation(ctx context.Context) (*OfferAnswer, error) {
select {
case remoteOfferAnswer := <-h.remoteOffersCh:
// received confirmation from the remote peer -> ready to proceed
if err := h.sendAnswer(); err != nil {
return nil, err
}
return &remoteOfferAnswer, nil
case remoteOfferAnswer := <-h.remoteAnswerCh:
return &remoteOfferAnswer, nil
case <-ctx.Done():
// closed externally
return nil, NewConnectionClosedError(h.config.Key)
} }
} }
@@ -147,43 +144,34 @@ func (h *Handshaker) sendOffer() error {
return ErrSignalIsNotReady return ErrSignalIsNotReady
} }
iceUFrag, icePwd := h.ice.GetLocalUserCredentials() offer := h.buildOfferAnswer()
offer := OfferAnswer{ h.log.Infof("sending offer with serial: %s", offer.SessionIDString())
IceCredentials: IceCredentials{iceUFrag, icePwd},
WgListenPort: h.config.LocalWgPort,
Version: version.NetbirdVersion(),
RosenpassPubKey: h.config.RosenpassConfig.PubKey,
RosenpassAddr: h.config.RosenpassConfig.Addr,
}
addr, err := h.relay.RelayInstanceAddress()
if err == nil {
offer.RelaySrvAddress = addr
}
return h.signaler.SignalOffer(offer, h.config.Key) return h.signaler.SignalOffer(offer, h.config.Key)
} }
func (h *Handshaker) sendAnswer() error { func (h *Handshaker) sendAnswer() error {
h.log.Infof("sending answer") answer := h.buildOfferAnswer()
uFrag, pwd := h.ice.GetLocalUserCredentials() h.log.Infof("sending answer with serial: %s", answer.SessionIDString())
return h.signaler.SignalAnswer(answer, h.config.Key)
}
func (h *Handshaker) buildOfferAnswer() OfferAnswer {
uFrag, pwd := h.ice.GetLocalUserCredentials()
sid := h.ice.SessionID()
answer := OfferAnswer{ answer := OfferAnswer{
IceCredentials: IceCredentials{uFrag, pwd}, IceCredentials: IceCredentials{uFrag, pwd},
WgListenPort: h.config.LocalWgPort, WgListenPort: h.config.LocalWgPort,
Version: version.NetbirdVersion(), Version: version.NetbirdVersion(),
RosenpassPubKey: h.config.RosenpassConfig.PubKey, RosenpassPubKey: h.config.RosenpassConfig.PubKey,
RosenpassAddr: h.config.RosenpassConfig.Addr, RosenpassAddr: h.config.RosenpassConfig.Addr,
SessionID: &sid,
} }
addr, err := h.relay.RelayInstanceAddress()
if err == nil { if addr, err := h.relay.RelayInstanceAddress(); err == nil {
answer.RelaySrvAddress = addr answer.RelaySrvAddress = addr
} }
err = h.signaler.SignalAnswer(answer, h.config.Key) return answer
if err != nil {
return err
}
return nil
} }

View File

@@ -0,0 +1,47 @@
package peer
import (
"crypto/rand"
"encoding/hex"
"fmt"
"io"
)
const sessionIDSize = 5
type ICESessionID string
// NewICESessionID generates a new session ID for distinguishing sessions
func NewICESessionID() (ICESessionID, error) {
b := make([]byte, sessionIDSize)
if _, err := io.ReadFull(rand.Reader, b); err != nil {
return "", fmt.Errorf("failed to generate session ID: %w", err)
}
return ICESessionID(hex.EncodeToString(b)), nil
}
func ICESessionIDFromBytes(b []byte) (ICESessionID, error) {
if len(b) != sessionIDSize {
return "", fmt.Errorf("invalid session ID length: %d", len(b))
}
return ICESessionID(hex.EncodeToString(b)), nil
}
// Bytes returns the raw bytes of the session ID for protobuf serialization
func (id ICESessionID) Bytes() ([]byte, error) {
if len(id) == 0 {
return nil, fmt.Errorf("ICE session ID is empty")
}
b, err := hex.DecodeString(string(id))
if err != nil {
return nil, fmt.Errorf("invalid ICE session ID encoding: %w", err)
}
if len(b) != sessionIDSize {
return nil, fmt.Errorf("invalid ICE session ID length: expected %d bytes, got %d", sessionIDSize, len(b))
}
return b, nil
}
func (id ICESessionID) String() string {
return string(id)
}

View File

@@ -2,6 +2,7 @@ package peer
import ( import (
"github.com/pion/ice/v3" "github.com/pion/ice/v3"
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
signal "github.com/netbirdio/netbird/shared/signal/client" signal "github.com/netbirdio/netbird/shared/signal/client"
@@ -45,6 +46,10 @@ func (s *Signaler) Ready() bool {
// SignalOfferAnswer signals either an offer or an answer to remote peer // SignalOfferAnswer signals either an offer or an answer to remote peer
func (s *Signaler) signalOfferAnswer(offerAnswer OfferAnswer, remoteKey string, bodyType sProto.Body_Type) error { func (s *Signaler) signalOfferAnswer(offerAnswer OfferAnswer, remoteKey string, bodyType sProto.Body_Type) error {
sessionIDBytes, err := offerAnswer.SessionID.Bytes()
if err != nil {
log.Warnf("failed to get session ID bytes: %v", err)
}
msg, err := signal.MarshalCredential( msg, err := signal.MarshalCredential(
s.wgPrivateKey, s.wgPrivateKey,
offerAnswer.WgListenPort, offerAnswer.WgListenPort,
@@ -56,13 +61,13 @@ func (s *Signaler) signalOfferAnswer(offerAnswer OfferAnswer, remoteKey string,
bodyType, bodyType,
offerAnswer.RosenpassPubKey, offerAnswer.RosenpassPubKey,
offerAnswer.RosenpassAddr, offerAnswer.RosenpassAddr,
offerAnswer.RelaySrvAddress) offerAnswer.RelaySrvAddress,
sessionIDBytes)
if err != nil { if err != nil {
return err return err
} }
err = s.signal.Send(msg) if err = s.signal.Send(msg); err != nil {
if err != nil {
return err return err
} }

View File

@@ -43,6 +43,16 @@ type WorkerICE struct {
hasRelayOnLocally bool hasRelayOnLocally bool
agent *ice.Agent agent *ice.Agent
agentDialerCancel context.CancelFunc
agentConnecting bool // while it is true, drop all incoming offers
lastSuccess time.Time // with this avoid the too frequent ICE agent recreation
// remoteSessionID represents the peer's session identifier from the latest remote offer.
remoteSessionID ICESessionID
// sessionID is used to track the current session ID of the ICE agent
// increase by one when disconnecting the agent
// with it the remote peer can discard the already deprecated offer/answer
// Without it the remote peer may recreate a workable ICE connection
sessionID ICESessionID
muxAgent sync.Mutex muxAgent sync.Mutex
StunTurn []*stun.URI StunTurn []*stun.URI
@@ -57,6 +67,11 @@ type WorkerICE struct {
} }
func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, conn *Conn, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool) (*WorkerICE, error) { func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, conn *Conn, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool) (*WorkerICE, error) {
sessionID, err := NewICESessionID()
if err != nil {
return nil, err
}
w := &WorkerICE{ w := &WorkerICE{
ctx: ctx, ctx: ctx,
log: log, log: log,
@@ -67,6 +82,7 @@ func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, conn *
statusRecorder: statusRecorder, statusRecorder: statusRecorder,
hasRelayOnLocally: hasRelayOnLocally, hasRelayOnLocally: hasRelayOnLocally,
lastKnownState: ice.ConnectionStateDisconnected, lastKnownState: ice.ConnectionStateDisconnected,
sessionID: sessionID,
} }
localUfrag, localPwd, err := icemaker.GenerateICECredentials() localUfrag, localPwd, err := icemaker.GenerateICECredentials()
@@ -79,14 +95,34 @@ func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, conn *
} }
func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
w.log.Debugf("OnNewOffer for ICE") w.log.Debugf("OnNewOffer for ICE, serial: %s", remoteOfferAnswer.SessionIDString())
w.muxAgent.Lock() w.muxAgent.Lock()
if w.agentConnecting {
w.log.Debugf("agent connection is in progress, skipping the offer")
w.muxAgent.Unlock()
return
}
if w.agent != nil { if w.agent != nil {
// backward compatibility with old clients that do not send session ID
if remoteOfferAnswer.SessionID == nil {
w.log.Debugf("agent already exists, skipping the offer") w.log.Debugf("agent already exists, skipping the offer")
w.muxAgent.Unlock() w.muxAgent.Unlock()
return return
} }
if w.remoteSessionID == *remoteOfferAnswer.SessionID {
w.log.Debugf("agent already exists and session ID matches, skipping the offer: %s", remoteOfferAnswer.SessionIDString())
w.muxAgent.Unlock()
return
}
w.log.Debugf("agent already exists, recreate the connection")
w.agentDialerCancel()
if err := w.agent.Close(); err != nil {
w.log.Warnf("failed to close ICE agent: %s", err)
}
// todo consider to switch to Relay connection while establishing a new ICE connection
}
var preferredCandidateTypes []ice.CandidateType var preferredCandidateTypes []ice.CandidateType
if w.hasRelayOnLocally && remoteOfferAnswer.RelaySrvAddress != "" { if w.hasRelayOnLocally && remoteOfferAnswer.RelaySrvAddress != "" {
@@ -96,36 +132,124 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
} }
w.log.Debugf("recreate ICE agent") w.log.Debugf("recreate ICE agent")
agentCtx, agentCancel := context.WithCancel(w.ctx) dialerCtx, dialerCancel := context.WithCancel(w.ctx)
agent, err := w.reCreateAgent(agentCancel, preferredCandidateTypes) agent, err := w.reCreateAgent(dialerCancel, preferredCandidateTypes)
if err != nil { if err != nil {
w.log.Errorf("failed to recreate ICE Agent: %s", err) w.log.Errorf("failed to recreate ICE Agent: %s", err)
w.muxAgent.Unlock() w.muxAgent.Unlock()
return return
} }
w.sentExtraSrflx = false
w.agent = agent w.agent = agent
w.agentDialerCancel = dialerCancel
w.agentConnecting = true
w.muxAgent.Unlock() w.muxAgent.Unlock()
w.log.Debugf("gather candidates") go w.connect(dialerCtx, agent, remoteOfferAnswer)
err = w.agent.GatherCandidates() }
if err != nil {
w.log.Debugf("failed to gather candidates: %s", err) // OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HAMap) {
w.muxAgent.Lock()
defer w.muxAgent.Unlock()
w.log.Debugf("OnRemoteCandidate from peer %s -> %s", w.config.Key, candidate.String())
if w.agent == nil {
w.log.Warnf("ICE Agent is not initialized yet")
return
}
if candidateViaRoutes(candidate, haRoutes) {
return
}
if err := w.agent.AddRemoteCandidate(candidate); err != nil {
w.log.Errorf("error while handling remote candidate")
return
}
}
func (w *WorkerICE) GetLocalUserCredentials() (frag string, pwd string) {
return w.localUfrag, w.localPwd
}
func (w *WorkerICE) InProgress() bool {
w.muxAgent.Lock()
defer w.muxAgent.Unlock()
return w.agentConnecting
}
func (w *WorkerICE) Close() {
w.muxAgent.Lock()
defer w.muxAgent.Unlock()
if w.agent == nil {
return
}
w.agentDialerCancel()
if err := w.agent.Close(); err != nil {
w.log.Warnf("failed to close ICE agent: %s", err)
}
w.agent = nil
}
func (w *WorkerICE) reCreateAgent(dialerCancel context.CancelFunc, candidates []ice.CandidateType) (*ice.Agent, error) {
agent, err := icemaker.NewAgent(w.iFaceDiscover, w.config.ICEConfig, candidates, w.localUfrag, w.localPwd)
if err != nil {
return nil, fmt.Errorf("create agent: %w", err)
}
if err := agent.OnCandidate(w.onICECandidate); err != nil {
return nil, err
}
if err := agent.OnConnectionStateChange(w.onConnectionStateChange(agent, dialerCancel)); err != nil {
return nil, err
}
if err := agent.OnSelectedCandidatePairChange(w.onICESelectedCandidatePair); err != nil {
return nil, err
}
if err := agent.OnSuccessfulSelectedPairBindingResponse(w.onSuccessfulSelectedPairBindingResponse); err != nil {
return nil, fmt.Errorf("failed setting binding response callback: %w", err)
}
return agent, nil
}
func (w *WorkerICE) SessionID() ICESessionID {
w.muxAgent.Lock()
defer w.muxAgent.Unlock()
return w.sessionID
}
// will block until connection succeeded
// but it won't release if ICE Agent went into Disconnected or Failed state,
// so we have to cancel it with the provided context once agent detected a broken connection
func (w *WorkerICE) connect(ctx context.Context, agent *ice.Agent, remoteOfferAnswer *OfferAnswer) {
w.log.Debugf("gather candidates")
if err := agent.GatherCandidates(); err != nil {
w.log.Warnf("failed to gather candidates: %s", err)
w.closeAgent(agent, w.agentDialerCancel)
return return
} }
// will block until connection succeeded
// but it won't release if ICE Agent went into Disconnected or Failed state,
// so we have to cancel it with the provided context once agent detected a broken connection
w.log.Debugf("turn agent dial") w.log.Debugf("turn agent dial")
remoteConn, err := w.turnAgentDial(agentCtx, remoteOfferAnswer) remoteConn, err := w.turnAgentDial(ctx, remoteOfferAnswer)
if err != nil { if err != nil {
w.log.Debugf("failed to dial the remote peer: %s", err) w.log.Debugf("failed to dial the remote peer: %s", err)
w.closeAgent(agent, w.agentDialerCancel)
return return
} }
w.log.Debugf("agent dial succeeded") w.log.Debugf("agent dial succeeded")
pair, err := w.agent.GetSelectedCandidatePair() pair, err := agent.GetSelectedCandidatePair()
if err != nil { if err != nil {
w.closeAgent(agent, w.agentDialerCancel)
return return
} }
@@ -152,114 +276,38 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
RelayedOnLocal: isRelayCandidate(pair.Local), RelayedOnLocal: isRelayCandidate(pair.Local),
} }
w.log.Debugf("on ICE conn is ready to use") w.log.Debugf("on ICE conn is ready to use")
go w.conn.onICEConnectionIsReady(selectedPriority(pair), ci)
}
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer. w.log.Infof("connection succeeded with offer session: %s", remoteOfferAnswer.SessionIDString())
func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HAMap) {
w.muxAgent.Lock() w.muxAgent.Lock()
defer w.muxAgent.Unlock() w.agentConnecting = false
w.log.Debugf("OnRemoteCandidate from peer %s -> %s", w.config.Key, candidate.String()) w.lastSuccess = time.Now()
if w.agent == nil { if remoteOfferAnswer.SessionID != nil {
w.log.Warnf("ICE Agent is not initialized yet") w.remoteSessionID = *remoteOfferAnswer.SessionID
return
} }
w.muxAgent.Unlock()
if candidateViaRoutes(candidate, haRoutes) { // todo: the potential problem is a race between the onConnectionStateChange
return w.conn.onICEConnectionIsReady(selectedPriority(pair), ci)
}
err := w.agent.AddRemoteCandidate(candidate)
if err != nil {
w.log.Errorf("error while handling remote candidate")
return
}
} }
func (w *WorkerICE) GetLocalUserCredentials() (frag string, pwd string) { func (w *WorkerICE) closeAgent(agent *ice.Agent, cancel context.CancelFunc) {
w.muxAgent.Lock()
defer w.muxAgent.Unlock()
return w.localUfrag, w.localPwd
}
func (w *WorkerICE) Close() {
w.muxAgent.Lock()
defer w.muxAgent.Unlock()
if w.agent == nil {
return
}
if err := w.agent.Close(); err != nil {
w.log.Warnf("failed to close ICE agent: %s", err)
}
}
func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, candidates []ice.CandidateType) (*ice.Agent, error) {
w.sentExtraSrflx = false
agent, err := icemaker.NewAgent(w.iFaceDiscover, w.config.ICEConfig, candidates, w.localUfrag, w.localPwd)
if err != nil {
return nil, fmt.Errorf("create agent: %w", err)
}
err = agent.OnCandidate(w.onICECandidate)
if err != nil {
return nil, err
}
err = agent.OnConnectionStateChange(func(state ice.ConnectionState) {
w.log.Debugf("ICE ConnectionState has changed to %s", state.String())
switch state {
case ice.ConnectionStateConnected:
w.lastKnownState = ice.ConnectionStateConnected
return
case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected:
if w.lastKnownState == ice.ConnectionStateConnected {
w.lastKnownState = ice.ConnectionStateDisconnected
w.conn.onICEStateDisconnected()
}
w.closeAgent(agentCancel)
default:
return
}
})
if err != nil {
return nil, err
}
err = agent.OnSelectedCandidatePairChange(w.onICESelectedCandidatePair)
if err != nil {
return nil, err
}
err = agent.OnSuccessfulSelectedPairBindingResponse(func(p *ice.CandidatePair) {
err := w.statusRecorder.UpdateLatency(w.config.Key, p.Latency())
if err != nil {
w.log.Debugf("failed to update latency for peer: %s", err)
return
}
})
if err != nil {
return nil, fmt.Errorf("failed setting binding response callback: %w", err)
}
return agent, nil
}
func (w *WorkerICE) closeAgent(cancel context.CancelFunc) {
w.muxAgent.Lock()
defer w.muxAgent.Unlock()
cancel() cancel()
if w.agent == nil { if err := agent.Close(); err != nil {
return
}
if err := w.agent.Close(); err != nil {
w.log.Warnf("failed to close ICE agent: %s", err) w.log.Warnf("failed to close ICE agent: %s", err)
} }
w.muxAgent.Lock()
sessionID, err := NewICESessionID()
if err != nil {
w.log.Errorf("failed to create new session ID: %s", err)
}
w.sessionID = sessionID
if w.agent == agent {
w.agent = nil w.agent = nil
w.agentConnecting = false
}
w.muxAgent.Unlock()
} }
func (w *WorkerICE) punchRemoteWGPort(pair *ice.CandidatePair, remoteWgPort int) { func (w *WorkerICE) punchRemoteWGPort(pair *ice.CandidatePair, remoteWgPort int) {
@@ -331,6 +379,32 @@ func (w *WorkerICE) onICESelectedCandidatePair(c1 ice.Candidate, c2 ice.Candidat
w.config.Key) w.config.Key)
} }
func (w *WorkerICE) onConnectionStateChange(agent *ice.Agent, dialerCancel context.CancelFunc) func(ice.ConnectionState) {
return func(state ice.ConnectionState) {
w.log.Debugf("ICE ConnectionState has changed to %s", state.String())
switch state {
case ice.ConnectionStateConnected:
w.lastKnownState = ice.ConnectionStateConnected
return
case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected:
if w.lastKnownState == ice.ConnectionStateConnected {
w.lastKnownState = ice.ConnectionStateDisconnected
w.conn.onICEStateDisconnected()
}
w.closeAgent(agent, dialerCancel)
default:
return
}
}
}
func (w *WorkerICE) onSuccessfulSelectedPairBindingResponse(pair *ice.CandidatePair) {
if err := w.statusRecorder.UpdateLatency(w.config.Key, pair.Latency()); err != nil {
w.log.Debugf("failed to update latency for peer: %s", err)
return
}
}
func (w *WorkerICE) shouldSendExtraSrflxCandidate(candidate ice.Candidate) bool { func (w *WorkerICE) shouldSendExtraSrflxCandidate(candidate ice.Candidate) bool {
if !w.sentExtraSrflx && candidate.Type() == ice.CandidateTypeServerReflexive && candidate.Port() != candidate.RelatedAddress().Port { if !w.sentExtraSrflx && candidate.Type() == ice.CandidateTypeServerReflexive && candidate.Port() != candidate.RelatedAddress().Port {
return true return true

View File

@@ -52,7 +52,7 @@ func UnMarshalCredential(msg *proto.Message) (*Credential, error) {
} }
// MarshalCredential marshal a Credential instance and returns a Message object // MarshalCredential marshal a Credential instance and returns a Message object
func MarshalCredential(myKey wgtypes.Key, myPort int, remoteKey string, credential *Credential, t proto.Body_Type, rosenpassPubKey []byte, rosenpassAddr string, relaySrvAddress string) (*proto.Message, error) { func MarshalCredential(myKey wgtypes.Key, myPort int, remoteKey string, credential *Credential, t proto.Body_Type, rosenpassPubKey []byte, rosenpassAddr string, relaySrvAddress string, sessionID []byte) (*proto.Message, error) {
return &proto.Message{ return &proto.Message{
Key: myKey.PublicKey().String(), Key: myKey.PublicKey().String(),
RemoteKey: remoteKey, RemoteKey: remoteKey,
@@ -66,6 +66,7 @@ func MarshalCredential(myKey wgtypes.Key, myPort int, remoteKey string, credenti
RosenpassServerAddr: rosenpassAddr, RosenpassServerAddr: rosenpassAddr,
}, },
RelayServerAddress: relaySrvAddress, RelayServerAddress: relaySrvAddress,
SessionId: sessionID,
}, },
}, nil }, nil
} }

View File

@@ -45,19 +45,10 @@ type GrpcClient struct {
connStateCallbackLock sync.RWMutex connStateCallbackLock sync.RWMutex
onReconnectedListenerFn func() onReconnectedListenerFn func()
}
func (c *GrpcClient) StreamConnected() bool { decryptionWorker *Worker
return c.status == StreamConnected decryptionWorkerCancel context.CancelFunc
} decryptionWg sync.WaitGroup
func (c *GrpcClient) GetStatus() Status {
return c.status
}
// Close Closes underlying connections to the Signal Exchange
func (c *GrpcClient) Close() error {
return c.signalConn.Close()
} }
// NewClient creates a new Signal client // NewClient creates a new Signal client
@@ -93,6 +84,25 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo
}, nil }, nil
} }
func (c *GrpcClient) StreamConnected() bool {
return c.status == StreamConnected
}
func (c *GrpcClient) GetStatus() Status {
return c.status
}
// Close Closes underlying connections to the Signal Exchange
func (c *GrpcClient) Close() error {
if c.decryptionWorkerCancel != nil {
c.decryptionWorkerCancel()
}
c.decryptionWg.Wait()
c.decryptionWorker = nil
return c.signalConn.Close()
}
// SetConnStateListener set the ConnStateNotifier // SetConnStateListener set the ConnStateNotifier
func (c *GrpcClient) SetConnStateListener(notifier ConnStateNotifier) { func (c *GrpcClient) SetConnStateListener(notifier ConnStateNotifier) {
c.connStateCallbackLock.Lock() c.connStateCallbackLock.Lock()
@@ -148,8 +158,12 @@ func (c *GrpcClient) Receive(ctx context.Context, msgHandler func(msg *proto.Mes
log.Infof("connected to the Signal Service stream") log.Infof("connected to the Signal Service stream")
c.notifyConnected() c.notifyConnected()
// Start worker pool if not already started
c.startEncryptionWorker(msgHandler)
// start receiving messages from the Signal stream (from other peers through signal) // start receiving messages from the Signal stream (from other peers through signal)
err = c.receive(stream, msgHandler) err = c.receive(stream)
if err != nil { if err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
log.Debugf("signal connection context has been canceled, this usually indicates shutdown") log.Debugf("signal connection context has been canceled, this usually indicates shutdown")
@@ -174,6 +188,7 @@ func (c *GrpcClient) Receive(ctx context.Context, msgHandler func(msg *proto.Mes
return nil return nil
} }
func (c *GrpcClient) notifyStreamDisconnected() { func (c *GrpcClient) notifyStreamDisconnected() {
c.mux.Lock() c.mux.Lock()
defer c.mux.Unlock() defer c.mux.Unlock()
@@ -382,11 +397,11 @@ func (c *GrpcClient) Send(msg *proto.Message) error {
} }
// receive receives messages from other peers coming through the Signal Exchange // receive receives messages from other peers coming through the Signal Exchange
func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient, // and distributes them to worker threads for processing
msgHandler func(msg *proto.Message) error) error { func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient) error {
for { for {
msg, err := stream.Recv() msg, err := stream.Recv()
// Handle errors immediately
switch s, ok := status.FromError(err); { switch s, ok := status.FromError(err); {
case ok && s.Code() == codes.Canceled: case ok && s.Code() == codes.Canceled:
log.Debugf("stream canceled (usually indicates shutdown)") log.Debugf("stream canceled (usually indicates shutdown)")
@@ -398,22 +413,35 @@ func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient,
log.Debugf("Signal Service stream closed by server") log.Debugf("Signal Service stream closed by server")
return err return err
case err != nil: case err != nil:
log.Errorf("Stream receive error: %v", err)
return err return err
} }
log.Tracef("received a new message from Peer [fingerprint: %s]", msg.Key)
decryptedMessage, err := c.decryptMessage(msg) if msg == nil {
if err != nil { continue
log.Errorf("failed decrypting message of Peer [key: %s] error: [%s]", msg.Key, err.Error())
} }
err = msgHandler(decryptedMessage) if err := c.decryptionWorker.AddMsg(c.ctx, msg); err != nil {
log.Errorf("failed to add message to decryption worker: %v", err)
}
}
}
if err != nil { func (c *GrpcClient) startEncryptionWorker(handler func(msg *proto.Message) error) {
log.Errorf("error while handling message of Peer [key: %s] error: [%s]", msg.Key, err.Error()) if c.decryptionWorker != nil {
// todo send something?? return
}
} }
c.decryptionWorker = NewWorker(c.decryptMessage, handler)
workerCtx, workerCancel := context.WithCancel(context.Background())
c.decryptionWorkerCancel = workerCancel
c.decryptionWg.Add(1)
go func() {
defer workerCancel()
c.decryptionWorker.Work(workerCtx)
c.decryptionWg.Done()
}()
} }
func (c *GrpcClient) notifyDisconnected(err error) { func (c *GrpcClient) notifyDisconnected(err error) {

View File

@@ -0,0 +1,55 @@
package client
import (
"context"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/shared/signal/proto"
)
type Worker struct {
decryptMessage func(msg *proto.EncryptedMessage) (*proto.Message, error)
handler func(msg *proto.Message) error
encryptedMsgPool chan *proto.EncryptedMessage
}
func NewWorker(decryptFn func(msg *proto.EncryptedMessage) (*proto.Message, error), handlerFn func(msg *proto.Message) error) *Worker {
return &Worker{
decryptMessage: decryptFn,
handler: handlerFn,
encryptedMsgPool: make(chan *proto.EncryptedMessage, 1),
}
}
func (w *Worker) AddMsg(ctx context.Context, msg *proto.EncryptedMessage) error {
// this is blocker because do not want to drop messages here
select {
case w.encryptedMsgPool <- msg:
case <-ctx.Done():
}
return nil
}
func (w *Worker) Work(ctx context.Context) {
for {
select {
case msg := <-w.encryptedMsgPool:
decryptedMessage, err := w.decryptMessage(msg)
if err != nil {
log.Errorf("failed to decrypt message: %v", err)
continue
}
if err := w.handler(decryptedMessage); err != nil {
log.Errorf("failed to handle message: %v", err)
continue
}
case <-ctx.Done():
log.Infof("Message worker stopping due to context cancellation")
return
}
}
}

View File

@@ -230,6 +230,7 @@ type Body struct {
RosenpassConfig *RosenpassConfig `protobuf:"bytes,7,opt,name=rosenpassConfig,proto3" json:"rosenpassConfig,omitempty"` RosenpassConfig *RosenpassConfig `protobuf:"bytes,7,opt,name=rosenpassConfig,proto3" json:"rosenpassConfig,omitempty"`
// relayServerAddress is url of the relay server // relayServerAddress is url of the relay server
RelayServerAddress string `protobuf:"bytes,8,opt,name=relayServerAddress,proto3" json:"relayServerAddress,omitempty"` RelayServerAddress string `protobuf:"bytes,8,opt,name=relayServerAddress,proto3" json:"relayServerAddress,omitempty"`
SessionId []byte `protobuf:"bytes,10,opt,name=sessionId,proto3,oneof" json:"sessionId,omitempty"`
} }
func (x *Body) Reset() { func (x *Body) Reset() {
@@ -320,6 +321,13 @@ func (x *Body) GetRelayServerAddress() string {
return "" return ""
} }
func (x *Body) GetSessionId() []byte {
if x != nil {
return x.SessionId
}
return nil
}
// Mode indicates a connection mode // Mode indicates a connection mode
type Mode struct { type Mode struct {
state protoimpl.MessageState state protoimpl.MessageState
@@ -443,7 +451,7 @@ var file_signalexchange_proto_rawDesc = []byte{
0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x04, 0x62, 0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x04, 0x62,
0x6f, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x6f, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e,
0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x52, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x52,
0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xb3, 0x03, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xe4, 0x03, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d,
0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x73, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x73,
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f,
0x64, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x64, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a,
@@ -466,34 +474,37 @@ var file_signalexchange_proto_rawDesc = []byte{
0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x2e, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x2e, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53,
0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x08, 0x20, 0x01, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x08, 0x20, 0x01,
0x28, 0x09, 0x52, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x28, 0x09, 0x52, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41,
0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x43, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x21, 0x0a, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f,
0x0a, 0x05, 0x4f, 0x46, 0x46, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x4e, 0x53, 0x6e, 0x49, 0x64, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x09, 0x73, 0x65, 0x73,
0x57, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x44, 0x49, 0x44, 0x41, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x88, 0x01, 0x01, 0x22, 0x43, 0x0a, 0x04, 0x54, 0x79, 0x70,
0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10, 0x04, 0x12, 0x0b, 0x65, 0x12, 0x09, 0x0a, 0x05, 0x4f, 0x46, 0x46, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06,
0x0a, 0x07, 0x47, 0x4f, 0x5f, 0x49, 0x44, 0x4c, 0x45, 0x10, 0x05, 0x22, 0x2e, 0x0a, 0x04, 0x4d, 0x41, 0x4e, 0x53, 0x57, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x44,
0x6f, 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x18, 0x01, 0x20, 0x49, 0x44, 0x41, 0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10,
0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x88, 0x01, 0x01, 0x04, 0x12, 0x0b, 0x0a, 0x07, 0x47, 0x4f, 0x5f, 0x49, 0x44, 0x4c, 0x45, 0x10, 0x05, 0x42, 0x0c,
0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x0a, 0x0f, 0x52, 0x0a, 0x0a, 0x5f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0x2e, 0x0a, 0x04,
0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x28, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x18, 0x01,
0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x88, 0x01,
0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x01, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x0a, 0x0f,
0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x52, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12,
0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x18, 0x28, 0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70,
0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01, 0x0a, 0x0e, 0x53, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x6f, 0x73,
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4c, 0x0a, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72,
0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73,
0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01, 0x0a, 0x0e,
0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4c,
0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65,
0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59, 0x0a, 0x0d, 0x43, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65,
0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x20, 0x2e, 0x73, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61,
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70,
0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59, 0x0a, 0x0d,
0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x20, 0x2e,
0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45,
0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65,
0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (
@@ -601,6 +612,7 @@ func file_signalexchange_proto_init() {
} }
} }
} }
file_signalexchange_proto_msgTypes[2].OneofWrappers = []interface{}{}
file_signalexchange_proto_msgTypes[3].OneofWrappers = []interface{}{} file_signalexchange_proto_msgTypes[3].OneofWrappers = []interface{}{}
type x struct{} type x struct{}
out := protoimpl.TypeBuilder{ out := protoimpl.TypeBuilder{

View File

@@ -64,6 +64,8 @@ message Body {
// relayServerAddress is url of the relay server // relayServerAddress is url of the relay server
string relayServerAddress = 8; string relayServerAddress = 8;
optional bytes sessionId = 10;
} }
// Mode indicates a connection mode // Mode indicates a connection mode