2024-09-08 12:06:14 +02:00
|
|
|
package peer
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"net"
|
|
|
|
"sync"
|
|
|
|
"sync/atomic"
|
|
|
|
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
|
|
|
|
relayClient "github.com/netbirdio/netbird/relay/client"
|
|
|
|
)
|
|
|
|
|
|
|
|
type RelayConnInfo struct {
|
|
|
|
relayedConn net.Conn
|
|
|
|
rosenpassPubKey []byte
|
|
|
|
rosenpassAddr string
|
|
|
|
}
|
|
|
|
|
|
|
|
type WorkerRelay struct {
|
|
|
|
log *log.Entry
|
2024-10-24 11:43:14 +02:00
|
|
|
isController bool
|
2024-09-08 12:06:14 +02:00
|
|
|
config ConnConfig
|
2025-02-10 10:32:50 +01:00
|
|
|
conn *Conn
|
2024-09-08 12:06:14 +02:00
|
|
|
relayManager relayClient.ManagerService
|
|
|
|
|
2025-02-10 10:32:50 +01:00
|
|
|
relayedConn net.Conn
|
|
|
|
relayLock sync.Mutex
|
2024-09-08 12:06:14 +02:00
|
|
|
|
|
|
|
relaySupportedOnRemotePeer atomic.Bool
|
2025-02-10 10:32:50 +01:00
|
|
|
|
|
|
|
wgWatcher *WGWatcher
|
2024-09-08 12:06:14 +02:00
|
|
|
}
|
|
|
|
|
2025-02-10 10:32:50 +01:00
|
|
|
func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, conn *Conn, relayManager relayClient.ManagerService) *WorkerRelay {
|
2024-09-08 12:06:14 +02:00
|
|
|
r := &WorkerRelay{
|
|
|
|
log: log,
|
2024-10-24 11:43:14 +02:00
|
|
|
isController: ctrl,
|
2024-09-08 12:06:14 +02:00
|
|
|
config: config,
|
2025-02-10 10:32:50 +01:00
|
|
|
conn: conn,
|
2024-09-08 12:06:14 +02:00
|
|
|
relayManager: relayManager,
|
2025-02-10 10:32:50 +01:00
|
|
|
wgWatcher: NewWGWatcher(log, config.WgConfig.WgInterface, config.Key),
|
2024-09-08 12:06:14 +02:00
|
|
|
}
|
|
|
|
return r
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
|
|
|
if !w.isRelaySupported(remoteOfferAnswer) {
|
|
|
|
w.log.Infof("Relay is not supported by remote peer")
|
|
|
|
w.relaySupportedOnRemotePeer.Store(false)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
w.relaySupportedOnRemotePeer.Store(true)
|
|
|
|
|
|
|
|
// the relayManager will return with error in case if the connection has lost with relay server
|
|
|
|
currentRelayAddress, err := w.relayManager.RelayInstanceAddress()
|
|
|
|
if err != nil {
|
|
|
|
w.log.Errorf("failed to handle new offer: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
srv := w.preferredRelayServer(currentRelayAddress, remoteOfferAnswer.RelaySrvAddress)
|
|
|
|
|
|
|
|
relayedConn, err := w.relayManager.OpenConn(srv, w.config.Key)
|
|
|
|
if err != nil {
|
|
|
|
if errors.Is(err, relayClient.ErrConnAlreadyExists) {
|
2024-10-02 15:14:09 +02:00
|
|
|
w.log.Debugf("handled offer by reusing existing relay connection")
|
2024-09-08 12:06:14 +02:00
|
|
|
return
|
|
|
|
}
|
|
|
|
w.log.Errorf("failed to open connection via Relay: %s", err)
|
|
|
|
return
|
|
|
|
}
|
2024-10-24 11:43:14 +02:00
|
|
|
|
2024-09-08 12:06:14 +02:00
|
|
|
w.relayLock.Lock()
|
|
|
|
w.relayedConn = relayedConn
|
|
|
|
w.relayLock.Unlock()
|
|
|
|
|
2025-02-10 10:32:50 +01:00
|
|
|
err = w.relayManager.AddCloseListener(srv, w.onRelayClientDisconnected)
|
2024-09-08 12:06:14 +02:00
|
|
|
if err != nil {
|
|
|
|
log.Errorf("failed to add close listener: %s", err)
|
|
|
|
_ = relayedConn.Close()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
w.log.Debugf("peer conn opened via Relay: %s", srv)
|
2025-02-10 10:32:50 +01:00
|
|
|
go w.conn.onRelayConnectionIsReady(RelayConnInfo{
|
2024-09-08 12:06:14 +02:00
|
|
|
relayedConn: relayedConn,
|
|
|
|
rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey,
|
|
|
|
rosenpassAddr: remoteOfferAnswer.RosenpassAddr,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *WorkerRelay) EnableWgWatcher(ctx context.Context) {
|
2025-02-10 10:32:50 +01:00
|
|
|
w.wgWatcher.EnableWgWatcher(ctx, w.onWGDisconnected)
|
2024-09-08 12:06:14 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (w *WorkerRelay) DisableWgWatcher() {
|
2025-02-10 10:32:50 +01:00
|
|
|
w.wgWatcher.DisableWgWatcher()
|
2024-09-08 12:06:14 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (w *WorkerRelay) RelayInstanceAddress() (string, error) {
|
|
|
|
return w.relayManager.RelayInstanceAddress()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *WorkerRelay) IsRelayConnectionSupportedWithPeer() bool {
|
|
|
|
return w.relaySupportedOnRemotePeer.Load() && w.RelayIsSupportedLocally()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *WorkerRelay) RelayIsSupportedLocally() bool {
|
|
|
|
return w.relayManager.HasRelayAddress()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *WorkerRelay) CloseConn() {
|
|
|
|
w.relayLock.Lock()
|
|
|
|
defer w.relayLock.Unlock()
|
|
|
|
if w.relayedConn == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2025-02-10 10:32:50 +01:00
|
|
|
if err := w.relayedConn.Close(); err != nil {
|
2024-09-08 12:06:14 +02:00
|
|
|
w.log.Warnf("failed to close relay connection: %v", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2025-02-10 10:32:50 +01:00
|
|
|
func (w *WorkerRelay) onWGDisconnected() {
|
|
|
|
w.relayLock.Lock()
|
|
|
|
_ = w.relayedConn.Close()
|
|
|
|
w.relayLock.Unlock()
|
2024-09-12 19:18:02 +02:00
|
|
|
|
2025-02-10 10:32:50 +01:00
|
|
|
w.conn.onRelayDisconnected()
|
2024-09-08 12:06:14 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool {
|
|
|
|
if !w.relayManager.HasRelayAddress() {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
return answer.RelaySrvAddress != ""
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *WorkerRelay) preferredRelayServer(myRelayAddress, remoteRelayAddress string) string {
|
2024-10-24 11:43:14 +02:00
|
|
|
if w.isController {
|
2024-09-08 12:06:14 +02:00
|
|
|
return myRelayAddress
|
|
|
|
}
|
|
|
|
return remoteRelayAddress
|
|
|
|
}
|
|
|
|
|
2025-02-10 10:32:50 +01:00
|
|
|
func (w *WorkerRelay) onRelayClientDisconnected() {
|
|
|
|
w.wgWatcher.DisableWgWatcher()
|
|
|
|
go w.conn.onRelayDisconnected()
|
2024-09-08 12:06:14 +02:00
|
|
|
}
|