mirror of
https://github.com/netbirdio/netbird.git
synced 2024-12-14 19:00:50 +01:00
211 lines
6.1 KiB
Go
211 lines
6.1 KiB
Go
package peer
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"github.com/netbirdio/netbird/version"
|
|
)
|
|
|
|
const (
|
|
handshakeCacheTimeout = 3 * time.Second
|
|
)
|
|
|
|
var (
|
|
ErrSignalIsNotReady = errors.New("signal is not ready")
|
|
)
|
|
|
|
type DoHandshake func() (*OfferAnswer, error)
|
|
|
|
// IceCredentials ICE protocol credentials struct
|
|
type IceCredentials struct {
|
|
UFrag string
|
|
Pwd string
|
|
}
|
|
|
|
// OfferAnswer represents a session establishment offer or answer
|
|
type OfferAnswer struct {
|
|
IceCredentials IceCredentials
|
|
// WgListenPort is a remote WireGuard listen port.
|
|
// This field is used when establishing a direct WireGuard connection without any proxy.
|
|
// We can set the remote peer's endpoint with this port.
|
|
WgListenPort int
|
|
|
|
// Version of NetBird Agent
|
|
Version string
|
|
// RosenpassPubKey is the Rosenpass public key of the remote peer when receiving this message
|
|
// This value is the local Rosenpass server public key when sending the message
|
|
RosenpassPubKey []byte
|
|
// RosenpassAddr is the Rosenpass server address (IP:port) of the remote peer when receiving this message
|
|
// This value is the local Rosenpass server address when sending the message
|
|
RosenpassAddr string
|
|
|
|
// relay server address
|
|
RelaySrvAddress string
|
|
}
|
|
|
|
type HandshakeArgs struct {
|
|
IceUFrag string
|
|
IcePwd string
|
|
RelayAddr string
|
|
}
|
|
|
|
type Handshaker struct {
|
|
mu sync.Mutex
|
|
ctx context.Context
|
|
config ConnConfig
|
|
signaler *Signaler
|
|
|
|
// remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection
|
|
remoteOffersCh chan OfferAnswer
|
|
// remoteAnswerCh is a channel used to wait for remote credentials answer (confirmation of our offer) to proceed with the connection
|
|
remoteAnswerCh chan OfferAnswer
|
|
|
|
remoteOfferAnswer *OfferAnswer
|
|
remoteOfferAnswerCreated time.Time
|
|
|
|
handshakeArgs HandshakeArgs
|
|
}
|
|
|
|
func NewHandshaker(ctx context.Context, config ConnConfig, signaler *Signaler) *Handshaker {
|
|
return &Handshaker{
|
|
ctx: ctx,
|
|
config: config,
|
|
signaler: signaler,
|
|
remoteOffersCh: make(chan OfferAnswer),
|
|
remoteAnswerCh: make(chan OfferAnswer),
|
|
}
|
|
}
|
|
|
|
func (h *Handshaker) Handshake(args HandshakeArgs) (*OfferAnswer, error) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
h.handshakeArgs = args
|
|
|
|
cachedOfferAnswer, ok := h.cachedHandshake()
|
|
if ok {
|
|
return cachedOfferAnswer, nil
|
|
}
|
|
|
|
err := h.sendOffer(args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Only continue once we got a connection confirmation from the remote peer.
|
|
// The connection timeout could have happened before a confirmation received from the remote.
|
|
// The connection could have also been closed externally (e.g. when we received an update from the management that peer shouldn't be connected)
|
|
remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
h.storeRemoteOfferAnswer(remoteOfferAnswer)
|
|
|
|
log.Debugf("received connection confirmation from peer %s running version %s and with remote WireGuard listen port %d",
|
|
h.config.Key, remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort)
|
|
|
|
return remoteOfferAnswer, nil
|
|
}
|
|
|
|
// OnRemoteOffer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
|
|
// doesn't block, discards the message if connection wasn't ready
|
|
func (h *Handshaker) OnRemoteOffer(offer OfferAnswer) bool {
|
|
select {
|
|
case h.remoteOffersCh <- offer:
|
|
return true
|
|
default:
|
|
log.Debugf("OnRemoteOffer skipping message from peer %s because is not ready", h.config.Key)
|
|
// connection might not be ready yet to receive so we ignore the message
|
|
return false
|
|
}
|
|
}
|
|
|
|
// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
|
|
// doesn't block, discards the message if connection wasn't ready
|
|
func (h *Handshaker) OnRemoteAnswer(answer OfferAnswer) bool {
|
|
select {
|
|
case h.remoteAnswerCh <- answer:
|
|
return true
|
|
default:
|
|
// connection might not be ready yet to receive so we ignore the message
|
|
log.Debugf("OnRemoteAnswer skipping message from peer %s because is not ready", h.config.Key)
|
|
return false
|
|
}
|
|
}
|
|
|
|
// sendOffer prepares local user credentials and signals them to the remote peer
|
|
func (h *Handshaker) sendOffer(args HandshakeArgs) error {
|
|
offer := OfferAnswer{
|
|
IceCredentials: IceCredentials{args.IceUFrag, args.IcePwd},
|
|
WgListenPort: h.config.LocalWgPort,
|
|
Version: version.NetbirdVersion(),
|
|
RosenpassPubKey: h.config.RosenpassPubKey,
|
|
RosenpassAddr: h.config.RosenpassAddr,
|
|
RelaySrvAddress: args.RelayAddr,
|
|
}
|
|
|
|
return h.signaler.SignalOffer(offer, h.config.Key)
|
|
}
|
|
|
|
func (h *Handshaker) sendAnswer() error {
|
|
log.Debugf("sending answer to %s", h.config.Key)
|
|
answer := OfferAnswer{
|
|
IceCredentials: IceCredentials{h.handshakeArgs.IceUFrag, h.handshakeArgs.IcePwd},
|
|
WgListenPort: h.config.LocalWgPort,
|
|
Version: version.NetbirdVersion(),
|
|
RosenpassPubKey: h.config.RosenpassPubKey,
|
|
RosenpassAddr: h.config.RosenpassAddr,
|
|
RelaySrvAddress: h.handshakeArgs.RelayAddr,
|
|
}
|
|
err := h.signaler.SignalAnswer(answer, h.config.Key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) {
|
|
timeout := time.NewTimer(h.config.Timeout)
|
|
defer timeout.Stop()
|
|
|
|
select {
|
|
case remoteOfferAnswer := <-h.remoteOffersCh:
|
|
// received confirmation from the remote peer -> ready to proceed
|
|
err := h.sendAnswer()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &remoteOfferAnswer, nil
|
|
case remoteOfferAnswer := <-h.remoteAnswerCh:
|
|
return &remoteOfferAnswer, nil
|
|
case <-timeout.C:
|
|
return nil, NewConnectionTimeoutError(h.config.Key, h.config.Timeout)
|
|
case <-h.ctx.Done():
|
|
// closed externally
|
|
return nil, NewConnectionClosedError(h.config.Key)
|
|
}
|
|
}
|
|
|
|
func (h *Handshaker) storeRemoteOfferAnswer(answer *OfferAnswer) {
|
|
h.remoteOfferAnswer = answer
|
|
h.remoteOfferAnswerCreated = time.Now()
|
|
}
|
|
|
|
func (h *Handshaker) cachedHandshake() (*OfferAnswer, bool) {
|
|
if h.remoteOfferAnswer == nil {
|
|
return nil, false
|
|
}
|
|
|
|
if time.Since(h.remoteOfferAnswerCreated) > handshakeCacheTimeout {
|
|
return nil, false
|
|
}
|
|
|
|
return h.remoteOfferAnswer, true
|
|
}
|