This commit is contained in:
Zoltán Papp 2024-07-29 21:53:07 +02:00
parent 12f472c58c
commit 5be33d668b
19 changed files with 204 additions and 23 deletions

View File

@ -1,6 +1,7 @@
package auth package auth
// AllowAllAuth is a Validator that allows all connections. // AllowAllAuth is a Validator that allows all connections.
// Used this for testing purposes only.
type AllowAllAuth struct { type AllowAllAuth struct {
} }

26
relay/auth/doc.go Normal file
View File

@ -0,0 +1,26 @@
/*
Package auth manages the authentication process with the relay server.
Key Components:
Validator: The Validator interface defines the Validate method. Any type that provides this method can be used as a
Validator.
Methods:
Validate(any): This method is defined in the Validator interface and is used to validate the authentication.
Usage:
To create a new AllowAllAuth validator, simply instantiate it:
validator := &auth.AllowAllAuth{}
To validate the authentication, use the Validate method:
err := validator.Validate(any)
This package provides a simple and effective way to manage authentication with the relay server, ensuring that the
peers are authenticated properly.
*/
package auth

8
relay/auth/hmac/doc.go Normal file
View File

@ -0,0 +1,8 @@
/*
This package uses a similar HMAC method for authentication with the TURN server. The Management server provides the
tokens for the peers. The peers manage these tokens in the token store. The token store is a simple thread safe store
that keeps the tokens in memory. These tokens are used to authenticate the peers with the Relay server in the hello
message.
*/
package hmac

View File

@ -43,6 +43,7 @@ type TimedHMAC struct {
timeToLive time.Duration timeToLive time.Duration
} }
// NewTimedHMAC creates a new TimedHMAC instance
func NewTimedHMAC(secret string, timeToLive time.Duration) *TimedHMAC { func NewTimedHMAC(secret string, timeToLive time.Duration) *TimedHMAC {
return &TimedHMAC{ return &TimedHMAC{
secret: secret, secret: secret,
@ -50,7 +51,8 @@ func NewTimedHMAC(secret string, timeToLive time.Duration) *TimedHMAC {
} }
} }
// GenerateToken generates new time-based secret token - basically Payload is a unix timestamp and Signature is a HMAC hash of a timestamp with a preshared TURN secret // GenerateToken generates new time-based secret token - basically Payload is a unix timestamp and Signature is a HMAC
// hash of a timestamp with a preshared TURN secret
func (m *TimedHMAC) GenerateToken() (*Token, error) { func (m *TimedHMAC) GenerateToken() (*Token, error) {
timeAuth := time.Now().Add(m.timeToLive).Unix() timeAuth := time.Now().Add(m.timeToLive).Unix()
timeStamp := fmt.Sprint(timeAuth) timeStamp := fmt.Sprint(timeAuth)
@ -66,6 +68,7 @@ func (m *TimedHMAC) GenerateToken() (*Token, error) {
}, nil }, nil
} }
// Validate checks if the token is valid
func (m *TimedHMAC) Validate(token Token) error { func (m *TimedHMAC) Validate(token Token) error {
expectedMAC, err := m.generate(token.Payload) expectedMAC, err := m.generate(token.Payload)
if err != nil { if err != nil {

View File

@ -1,5 +1,6 @@
package auth package auth
// Validator is an interface that defines the Validate method.
type Validator interface { type Validator interface {
Validate(any) error Validate(any) error
} }

View File

@ -6,6 +6,7 @@ import (
"time" "time"
) )
// Conn represent a connection to a relayed remote peer.
type Conn struct { type Conn struct {
client *Client client *Client
dstID []byte dstID []byte
@ -14,6 +15,12 @@ type Conn struct {
instanceURL *RelayAddr instanceURL *RelayAddr
} }
// NewConn creates a new connection to a relayed remote peer.
// client: the client instance, it used to send messages to the destination peer
// dstID: the destination peer ID
// dstStringID: the destination peer ID in string format
// messageChan: the channel where the messages will be received
// instanceURL: the relay instance URL, it used to get the proper server instance address for the remote peer
func NewConn(client *Client, dstID []byte, dstStringID string, messageChan chan Msg, instanceURL *RelayAddr) *Conn { func NewConn(client *Client, dstID []byte, dstStringID string, messageChan chan Msg, instanceURL *RelayAddr) *Conn {
c := &Conn{ c := &Conn{
client: client, client: client,

12
relay/client/doc.go Normal file
View File

@ -0,0 +1,12 @@
/*
Package client contains the implementation of the Relay client.
The Relay client is responsible for establishing a connection with the Relay server and sending and receiving messages,
Keep persistent connection with the Relay server and handle the connection issues.
It uses the WebSocket protocol for communication and optionally supports TLS (Transport Layer Security).
If a peer wants to communicate with a peer on a different relay server, the manager will establish a new connection to
the relay server. The connection with these relay servers will be closed if there is no active connection. The peers
negotiate the common relay instance via signaling service.
*/
package client

View File

@ -11,11 +11,13 @@ var (
reconnectingTimeout = 5 * time.Second reconnectingTimeout = 5 * time.Second
) )
// Guard manage the reconnection tries to the Relay server in case of disconnection event.
type Guard struct { type Guard struct {
ctx context.Context ctx context.Context
relayClient *Client relayClient *Client
} }
// NewGuard creates a new guard for the relay client.
func NewGuard(context context.Context, relayClient *Client) *Guard { func NewGuard(context context.Context, relayClient *Client) *Guard {
g := &Guard{ g := &Guard{
ctx: context, ctx: context,
@ -24,8 +26,9 @@ func NewGuard(context context.Context, relayClient *Client) *Guard {
return g return g
} }
// OnDisconnected is called when the relay client is disconnected from the relay server. It will trigger the reconnection
// todo prevent multiple reconnection instances. In the current usage it should not happen, but it is better to prevent
func (g *Guard) OnDisconnected() { func (g *Guard) OnDisconnected() {
// todo prevent multiple reconnect
ticker := time.NewTicker(reconnectingTimeout) ticker := time.NewTicker(reconnectingTimeout)
defer ticker.Stop() defer ticker.Stop()

View File

@ -34,6 +34,7 @@ func NewRelayTrack() *RelayTrack {
type OnServerCloseListener func() type OnServerCloseListener func()
// ManagerService is the interface for the relay manager.
type ManagerService interface { type ManagerService interface {
Serve() error Serve() error
OpenConn(serverAddress, peerKey string) (net.Conn, error) OpenConn(serverAddress, peerKey string) (net.Conn, error)
@ -44,9 +45,9 @@ type ManagerService interface {
UpdateToken(token *relayAuth.Token) UpdateToken(token *relayAuth.Token)
} }
// Manager is a manager for the relay client. It establish one persistent connection to the given relay server. In case // Manager is a manager for the relay client instances. It establishes one persistent connection to the given relay URL
// of network error the manager will try to reconnect to the server. // and automatically reconnect to them in case disconnection.
// The manager also manage temproary relay connection. If a client wants to communicate with an another client on a // The manager also manage temporary relay connection. If a client wants to communicate with a client on a
// different relay server, the manager will establish a new connection to the relay server. The connection with these // different relay server, the manager will establish a new connection to the relay server. The connection with these
// relay servers will be closed if there is no active connection. Periodically the manager will check if there is any // relay servers will be closed if there is no active connection. Periodically the manager will check if there is any
// unused relay connection and close it. // unused relay connection and close it.
@ -66,6 +67,8 @@ type Manager struct {
listenerLock sync.Mutex listenerLock sync.Mutex
} }
// NewManager creates a new manager instance.
// The serverURL address can be empty. In this case, the manager will not serve.
func NewManager(ctx context.Context, serverURL string, peerID string) *Manager { func NewManager(ctx context.Context, serverURL string, peerID string) *Manager {
return &Manager{ return &Manager{
ctx: ctx, ctx: ctx,
@ -77,7 +80,8 @@ func NewManager(ctx context.Context, serverURL string, peerID string) *Manager {
} }
} }
// Serve starts the manager. It will establish a connection to the relay server and start the relay cleanup loop. // Serve starts the manager. It will establish a connection to the relay server and start the relay cleanup loop for
// the unused relay connections. The manager will automatically reconnect to the relay server in case of disconnection.
func (m *Manager) Serve() error { func (m *Manager) Serve() error {
if m.relayClient != nil { if m.relayClient != nil {
return fmt.Errorf("manager already serving") return fmt.Errorf("manager already serving")
@ -101,7 +105,7 @@ func (m *Manager) Serve() error {
// OpenConn opens a connection to the given peer key. If the peer is on the same relay server, the connection will be // OpenConn opens a connection to the given peer key. If the peer is on the same relay server, the connection will be
// established via the relay server. If the peer is on a different relay server, the manager will establish a new // established via the relay server. If the peer is on a different relay server, the manager will establish a new
// connection to the relay server. // connection to the relay server. It returns back with a net.Conn what represent the remote peer connection.
func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) { func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) {
if m.relayClient == nil { if m.relayClient == nil {
return nil, errRelayClientNotConnected return nil, errRelayClientNotConnected
@ -129,6 +133,8 @@ func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) {
return netConn, err return netConn, err
} }
// AddCloseListener adds a listener to the given server instance address. The listener will be called if the connection
// closed.
func (m *Manager) AddCloseListener(serverAddress string, onClosedListener OnServerCloseListener) error { func (m *Manager) AddCloseListener(serverAddress string, onClosedListener OnServerCloseListener) error {
foreign, err := m.isForeignServer(serverAddress) foreign, err := m.isForeignServer(serverAddress)
if err != nil { if err != nil {
@ -145,8 +151,8 @@ func (m *Manager) AddCloseListener(serverAddress string, onClosedListener OnServ
return nil return nil
} }
// RelayInstanceAddress returns the address of the permanent relay server. It could change if the network connection is lost. // RelayInstanceAddress returns the address of the permanent relay server. It could change if the network connection is
// This address will be sent to the target peer to choose the common relay server for the communication. // lost. This address will be sent to the target peer to choose the common relay server for the communication.
func (m *Manager) RelayInstanceAddress() (string, error) { func (m *Manager) RelayInstanceAddress() (string, error) {
if m.relayClient == nil { if m.relayClient == nil {
return "", errRelayClientNotConnected return "", errRelayClientNotConnected
@ -159,10 +165,13 @@ func (m *Manager) ServerURL() string {
return m.serverURL return m.serverURL
} }
// HasRelayAddress returns true if the manager is serving. With this method can check if the peer can communicate with
// Relay service.
func (m *Manager) HasRelayAddress() bool { func (m *Manager) HasRelayAddress() bool {
return m.serverURL != "" return m.serverURL != ""
} }
// UpdateToken updates the token in the token store.
func (m *Manager) UpdateToken(token *relayAuth.Token) { func (m *Manager) UpdateToken(token *relayAuth.Token) {
m.tokenStore.UpdateToken(token) m.tokenStore.UpdateToken(token)
} }

8
relay/doc.go Normal file
View File

@ -0,0 +1,8 @@
//Package relay
/*
The `relay` package contains the implementation of the Relay server and client. The Relay server can be used to relay
messages between peers on a single network channel. In this implementation the transport layer is the WebSocket
protocol.
*/
package relay

17
relay/healthcheck/doc.go Normal file
View File

@ -0,0 +1,17 @@
/*
The `healthcheck` package is responsible for managing the health checks between the client and the relay server. It
ensures that the connection between the client and the server are alive and functioning properly.
The `Sender` struct is responsible for sending health check signals to the receiver. The receiver listens for these
signals and sends a new signal back to the sender to acknowledge that the signal has been received. If the sender does
not receive an acknowledgment signal within a certain time frame, it will send a timeout signal via timeout channel
and stop working.
The `Receiver` struct is responsible for receiving the health check signals from the sender. If the receiver does not
receive a signal within a certain time frame, it will send a timeout signal via the OnTimeout channel and stop working.
In the Relay usage the signal is sent to the peer in message type Healthcheck. In case of timeout the connection is
closed and the peer is removed from the relay.
*/
package healthcheck

View File

@ -15,8 +15,10 @@ var (
// If the receiver does not receive the signal in a certain time, it will send a timeout signal and stop to work // If the receiver does not receive the signal in a certain time, it will send a timeout signal and stop to work
// It will also stop if the context is canceled // It will also stop if the context is canceled
type Sender struct { type Sender struct {
// HealthCheck is a channel to send health check signal to the peer
HealthCheck chan struct{} HealthCheck chan struct{}
Timeout chan struct{} // Timeout is a channel to the health check signal is not received in a certain time
Timeout chan struct{}
ctx context.Context ctx context.Context
ack chan struct{} ack chan struct{}
@ -35,6 +37,7 @@ func NewSender(ctx context.Context) *Sender {
return hc return hc
} }
// OnHCResponse sends an acknowledgment signal to the sender
func (hc *Sender) OnHCResponse() { func (hc *Sender) OnHCResponse() {
select { select {
case hc.ack <- struct{}{}: case hc.ack <- struct{}{}:

5
relay/messages/doc.go Normal file
View File

@ -0,0 +1,5 @@
/*
Package messages provides the message types that are used to communicate between the relay and the client.
This package is used to determine the type of message that is being sent and received between the relay and the client.
*/
package messages

View File

@ -15,6 +15,7 @@ var (
prefix = []byte("sha-") // 4 bytes prefix = []byte("sha-") // 4 bytes
) )
// HashID generates a sha256 hash from the peerID and returns the hash and the human-readable string
func HashID(peerID string) ([]byte, string) { func HashID(peerID string) ([]byte, string) {
idHash := sha256.Sum256([]byte(peerID)) idHash := sha256.Sum256([]byte(peerID))
idHashString := string(prefix) + base64.StdEncoding.EncodeToString(idHash[:]) idHashString := string(prefix) + base64.StdEncoding.EncodeToString(idHash[:])
@ -24,6 +25,7 @@ func HashID(peerID string) ([]byte, string) {
return prefixedHash, idHashString return prefixedHash, idHashString
} }
// HashIDToString converts a hash to a human-readable string
func HashIDToString(idHash []byte) string { func HashIDToString(idHash []byte) string {
return fmt.Sprintf("%s%s", idHash[:prefixLength], base64.StdEncoding.EncodeToString(idHash[prefixLength:])) return fmt.Sprintf("%s%s", idHash[:prefixLength], base64.StdEncoding.EncodeToString(idHash[prefixLength:]))
} }

View File

@ -54,6 +54,7 @@ type HelloResponse struct {
InstanceAddress string InstanceAddress string
} }
// DetermineClientMsgType determines the message type from the first byte of the message
func DetermineClientMsgType(msg []byte) (MsgType, error) { func DetermineClientMsgType(msg []byte) (MsgType, error) {
msgType := MsgType(msg[0]) msgType := MsgType(msg[0])
switch msgType { switch msgType {
@ -70,6 +71,7 @@ func DetermineClientMsgType(msg []byte) (MsgType, error) {
} }
} }
// DetermineServerMsgType determines the message type from the first byte of the message
func DetermineServerMsgType(msg []byte) (MsgType, error) { func DetermineServerMsgType(msg []byte) (MsgType, error) {
msgType := MsgType(msg[0]) msgType := MsgType(msg[0])
switch msgType { switch msgType {
@ -87,6 +89,10 @@ func DetermineServerMsgType(msg []byte) (MsgType, error) {
} }
// MarshalHelloMsg initial hello message // MarshalHelloMsg initial hello message
// The Hello message is the first message sent by a client after establishing a connection with the Relay server. This
// message is used to authenticate the client with the server. The authentication is done using an HMAC method.
// The protocol does not limit to use HMAC, it can be any other method. If the authentication failed the server will
// close the network connection without any response.
func MarshalHelloMsg(peerID []byte, additions []byte) ([]byte, error) { func MarshalHelloMsg(peerID []byte, additions []byte) ([]byte, error) {
if len(peerID) != IDSize { if len(peerID) != IDSize {
return nil, fmt.Errorf("invalid peerID length: %d", len(peerID)) return nil, fmt.Errorf("invalid peerID length: %d", len(peerID))
@ -101,6 +107,8 @@ func MarshalHelloMsg(peerID []byte, additions []byte) ([]byte, error) {
return msg, nil return msg, nil
} }
// UnmarshalHelloMsg extracts the peerID and the additional data from the hello message. The Additional data is used to
// authenticate the client with the server.
func UnmarshalHelloMsg(msg []byte) ([]byte, []byte, error) { func UnmarshalHelloMsg(msg []byte) ([]byte, []byte, error) {
if len(msg) < headerSizeHello { if len(msg) < headerSizeHello {
return nil, nil, fmt.Errorf("invalid 'hello' messge") return nil, nil, fmt.Errorf("invalid 'hello' messge")
@ -111,6 +119,10 @@ func UnmarshalHelloMsg(msg []byte) ([]byte, []byte, error) {
return msg[5 : 5+IDSize], msg[headerSizeHello:], nil return msg[5 : 5+IDSize], msg[headerSizeHello:], nil
} }
// MarshalHelloResponse creates a response message to the hello message.
// In case of success connection the server response with a Hello Response message. This message contains the server's
// instance URL. This URL will be used by choose the common Relay server in case if the peers are in different Relay
// servers.
func MarshalHelloResponse(DomainAddress string) ([]byte, error) { func MarshalHelloResponse(DomainAddress string) ([]byte, error) {
payload := HelloResponse{ payload := HelloResponse{
InstanceAddress: DomainAddress, InstanceAddress: DomainAddress,
@ -131,6 +143,7 @@ func MarshalHelloResponse(DomainAddress string) ([]byte, error) {
return msg, nil return msg, nil
} }
// UnmarshalHelloResponse extracts the instance address from the hello response message
func UnmarshalHelloResponse(msg []byte) (string, error) { func UnmarshalHelloResponse(msg []byte) (string, error) {
if len(msg) < 2 { if len(msg) < 2 {
return "", fmt.Errorf("invalid 'hello response' message") return "", fmt.Errorf("invalid 'hello response' message")
@ -147,16 +160,18 @@ func UnmarshalHelloResponse(msg []byte) (string, error) {
return payload.InstanceAddress, nil return payload.InstanceAddress, nil
} }
// Close message // MarshalCloseMsg creates a close message.
// The close message is used to close the connection gracefully between the client and the server. The server and the
// client can send this message. After receiving this message, the server or client will close the connection.
func MarshalCloseMsg() []byte { func MarshalCloseMsg() []byte {
msg := make([]byte, 1) msg := make([]byte, 1)
msg[0] = byte(MsgTypeClose) msg[0] = byte(MsgTypeClose)
return msg return msg
} }
// Transport message // MarshalTransportMsg creates a transport message.
// The transport message is used to exchange data between peers. The message contains the data to be exchanged and the
// destination peer hashed ID.
func MarshalTransportMsg(peerID []byte, payload []byte) ([]byte, error) { func MarshalTransportMsg(peerID []byte, payload []byte) ([]byte, error) {
if len(peerID) != IDSize { if len(peerID) != IDSize {
return nil, fmt.Errorf("invalid peerID length: %d", len(peerID)) return nil, fmt.Errorf("invalid peerID length: %d", len(peerID))
@ -169,6 +184,7 @@ func MarshalTransportMsg(peerID []byte, payload []byte) ([]byte, error) {
return msg, nil return msg, nil
} }
// UnmarshalTransportMsg extracts the peerID and the payload from the transport message.
func UnmarshalTransportMsg(buf []byte) ([]byte, []byte, error) { func UnmarshalTransportMsg(buf []byte) ([]byte, []byte, error) {
if len(buf) < headerSizeTransport { if len(buf) < headerSizeTransport {
return nil, nil, ErrInvalidMessageLength return nil, nil, ErrInvalidMessageLength
@ -177,6 +193,7 @@ func UnmarshalTransportMsg(buf []byte) ([]byte, []byte, error) {
return buf[1:headerSizeTransport], buf[headerSizeTransport:], nil return buf[1:headerSizeTransport], buf[headerSizeTransport:], nil
} }
// UnmarshalTransportID extracts the peerID from the transport message.
func UnmarshalTransportID(buf []byte) ([]byte, error) { func UnmarshalTransportID(buf []byte) ([]byte, error) {
if len(buf) < headerSizeTransport { if len(buf) < headerSizeTransport {
log.Debugf("invalid message length: %d, expected: %d, %x", len(buf), headerSizeTransport, buf) log.Debugf("invalid message length: %d, expected: %d, %x", len(buf), headerSizeTransport, buf)
@ -185,6 +202,9 @@ func UnmarshalTransportID(buf []byte) ([]byte, error) {
return buf[1:headerSizeTransport], nil return buf[1:headerSizeTransport], nil
} }
// UpdateTransportMsg updates the peerID in the transport message.
// With this function the server can reuse the given byte slice to update the peerID in the transport message. So do
// need to allocate a new byte slice.
func UpdateTransportMsg(msg []byte, peerID []byte) error { func UpdateTransportMsg(msg []byte, peerID []byte) error {
if len(msg) < 1+len(peerID) { if len(msg) < 1+len(peerID) {
return ErrInvalidMessageLength return ErrInvalidMessageLength
@ -193,8 +213,9 @@ func UpdateTransportMsg(msg []byte, peerID []byte) error {
return nil return nil
} }
// health check message // MarshalHealthcheck creates a health check message.
// Health check message is sent by the server periodically. The client will respond with a health check response
// message. If the client does not respond to the health check message, the server will close the connection.
func MarshalHealthcheck() []byte { func MarshalHealthcheck() []byte {
return healthCheckMsg return healthCheckMsg
} }

View File

@ -18,6 +18,7 @@ const (
bufferSize = 8820 bufferSize = 8820
) )
// Peer represents a peer connection
type Peer struct { type Peer struct {
log *log.Entry log *log.Entry
idS string idS string
@ -27,6 +28,7 @@ type Peer struct {
store *Store store *Store
} }
// NewPeer creates a new Peer instance and prepare custom logging
func NewPeer(id []byte, conn net.Conn, store *Store) *Peer { func NewPeer(id []byte, conn net.Conn, store *Store) *Peer {
stringID := messages.HashIDToString(id) stringID := messages.HashIDToString(id)
return &Peer{ return &Peer{
@ -38,6 +40,9 @@ func NewPeer(id []byte, conn net.Conn, store *Store) *Peer {
} }
} }
// Work reads data from the connection
// It manages the protocol (healthcheck, transport, close). Read the message and determine the message type and handle
// the message accordingly.
func (p *Peer) Work() { func (p *Peer) Work() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
hc := healthcheck.NewSender(ctx) hc := healthcheck.NewSender(ctx)
@ -75,13 +80,14 @@ func (p *Peer) Work() {
} }
// Write writes data to the connection // Write writes data to the connection
// it has been called by the remote peer
func (p *Peer) Write(b []byte) (int, error) { func (p *Peer) Write(b []byte) (int, error) {
p.connMu.RLock() p.connMu.RLock()
defer p.connMu.RUnlock() defer p.connMu.RUnlock()
return p.conn.Write(b) return p.conn.Write(b)
} }
// CloseGracefully closes the connection with the peer gracefully. Send a close message to the client and close the
// connection.
func (p *Peer) CloseGracefully(ctx context.Context) { func (p *Peer) CloseGracefully(ctx context.Context) {
p.connMu.Lock() p.connMu.Lock()
_, err := p.writeWithTimeout(ctx, messages.MarshalCloseMsg()) _, err := p.writeWithTimeout(ctx, messages.MarshalCloseMsg())
@ -97,6 +103,7 @@ func (p *Peer) CloseGracefully(ctx context.Context) {
defer p.connMu.Unlock() defer p.connMu.Unlock()
} }
// String returns the peer ID
func (p *Peer) String() string { func (p *Peer) String() string {
return p.idS return p.idS
} }

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"net/url"
"sync" "sync"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -14,6 +15,7 @@ import (
"github.com/netbirdio/netbird/relay/metrics" "github.com/netbirdio/netbird/relay/metrics"
) )
// Relay represents the relay server
type Relay struct { type Relay struct {
metrics *metrics.Metrics metrics *metrics.Metrics
validator auth.Validator validator auth.Validator
@ -25,6 +27,21 @@ type Relay struct {
closeMu sync.RWMutex closeMu sync.RWMutex
} }
// NewRelay creates a new Relay instance
//
// Parameters:
// meter: An instance of metric.Meter from the go.opentelemetry.io/otel/metric package. It is used to create and manage
// metrics for the relay server.
// exposedAddress: A string representing the address that the relay server is exposed on. The client will use this
// address as the relay server's instance URL.
// tlsSupport: A boolean indicating whether the relay server supports TLS (Transport Layer Security) or not. The
// instance URL depends on this value.
// validator: An instance of auth.Validator from the auth package. It is used to validate the authentication of the
// peers.
//
// Returns:
// A pointer to a Relay instance and an error. If the Relay instance is successfully created, the error is nil.
// Otherwise, the error contains the details of what went wrong.
func NewRelay(meter metric.Meter, exposedAddress string, tlsSupport bool, validator auth.Validator) (*Relay, error) { func NewRelay(meter metric.Meter, exposedAddress string, tlsSupport bool, validator auth.Validator) (*Relay, error) {
m, err := metrics.NewMetrics(meter) m, err := metrics.NewMetrics(meter)
if err != nil { if err != nil {
@ -42,10 +59,15 @@ func NewRelay(meter metric.Meter, exposedAddress string, tlsSupport bool, valida
} else { } else {
r.instanceURL = fmt.Sprintf("rel://%s", exposedAddress) r.instanceURL = fmt.Sprintf("rel://%s", exposedAddress)
} }
_, err = url.ParseRequestURI(r.instanceURL)
if err != nil {
return nil, fmt.Errorf("invalid exposed address: %v", err)
}
return r, nil return r, nil
} }
// Accept start to handle a new peer connection
func (r *Relay) Accept(conn net.Conn) { func (r *Relay) Accept(conn net.Conn) {
r.closeMu.RLock() r.closeMu.RLock()
defer r.closeMu.RUnlock() defer r.closeMu.RUnlock()
@ -53,7 +75,7 @@ func (r *Relay) Accept(conn net.Conn) {
return return
} }
peerID, err := r.handShake(conn) peerID, err := r.handshake(conn)
if err != nil { if err != nil {
log.Errorf("failed to handshake with %s: %s", conn.RemoteAddr(), err) log.Errorf("failed to handshake with %s: %s", conn.RemoteAddr(), err)
cErr := conn.Close() cErr := conn.Close()
@ -75,6 +97,8 @@ func (r *Relay) Accept(conn net.Conn) {
}() }()
} }
// Close closes the relay server
// It closes the connection with all peers in gracefully and stops accepting new connections.
func (r *Relay) Close(ctx context.Context) { func (r *Relay) Close(ctx context.Context) {
log.Infof("close connection with all peers") log.Infof("close connection with all peers")
r.closeMu.Lock() r.closeMu.Lock()
@ -91,7 +115,12 @@ func (r *Relay) Close(ctx context.Context) {
r.closeMu.Unlock() r.closeMu.Unlock()
} }
func (r *Relay) handShake(conn net.Conn) ([]byte, error) { // InstanceURL returns the instance URL of the relay server
func (r *Relay) InstanceURL() string {
return r.instanceURL
}
func (r *Relay) handshake(conn net.Conn) ([]byte, error) {
buf := make([]byte, messages.MaxHandshakeSize) buf := make([]byte, messages.MaxHandshakeSize)
n, err := conn.Read(buf) n, err := conn.Read(buf)
if err != nil { if err != nil {
@ -127,7 +156,3 @@ func (r *Relay) handShake(conn net.Conn) ([]byte, error) {
} }
return peerID, nil return peerID, nil
} }
func (r *Relay) InstanceURL() string {
return r.instanceURL
}

View File

@ -13,16 +13,27 @@ import (
"github.com/netbirdio/netbird/relay/server/listener/ws" "github.com/netbirdio/netbird/relay/server/listener/ws"
) )
// ListenerConfig is the configuration for the listener.
// Address: the address to bind the listener to. It could be an address behind a reverse proxy.
// TLSConfig: the TLS configuration for the listener.
type ListenerConfig struct { type ListenerConfig struct {
Address string Address string
TLSConfig *tls.Config TLSConfig *tls.Config
} }
// Server is the main entry point for the relay server.
// It is the gate between the WebSocket listener and the Relay server logic.
// In a new HTTP connection, the server will accept the connection and pass it to the Relay server via the Accept method.
type Server struct { type Server struct {
relay *Relay relay *Relay
wSListener listener.Listener wSListener listener.Listener
} }
// NewServer creates a new relay server instance.
// meter: the OpenTelemetry meter
// exposedAddress: this address will be used as the instance URL. It should be a domain:port format.
// tlsSupport: if true, the server will support TLS
// authValidator: the auth validator to use for the server
func NewServer(meter metric.Meter, exposedAddress string, tlsSupport bool, authValidator auth.Validator) (*Server, error) { func NewServer(meter metric.Meter, exposedAddress string, tlsSupport bool, authValidator auth.Validator) (*Server, error) {
relay, err := NewRelay(meter, exposedAddress, tlsSupport, authValidator) relay, err := NewRelay(meter, exposedAddress, tlsSupport, authValidator)
if err != nil { if err != nil {
@ -33,6 +44,7 @@ func NewServer(meter metric.Meter, exposedAddress string, tlsSupport bool, authV
}, nil }, nil
} }
// Listen starts the relay server.
func (r *Server) Listen(cfg ListenerConfig) error { func (r *Server) Listen(cfg ListenerConfig) error {
r.wSListener = &ws.Listener{ r.wSListener = &ws.Listener{
Address: cfg.Address, Address: cfg.Address,
@ -47,6 +59,8 @@ func (r *Server) Listen(cfg ListenerConfig) error {
return wslErr return wslErr
} }
// Close stops the relay server. If there are active connections, they will be closed gracefully. In case of a timeout,
// the connections will be forcefully closed.
func (r *Server) Close() (err error) { func (r *Server) Close() (err error) {
// stop service new connections // stop service new connections
if r.wSListener != nil { if r.wSListener != nil {
@ -60,6 +74,7 @@ func (r *Server) Close() (err error) {
return return
} }
// InstanceURL returns the instance URL of the relay server.
func (r *Server) InstanceURL() string { func (r *Server) InstanceURL() string {
return r.relay.instanceURL return r.relay.instanceURL
} }

View File

@ -4,29 +4,36 @@ import (
"sync" "sync"
) )
// Store is a thread-safe store of peers
// It is used to store the peers that are connected to the relay server
type Store struct { type Store struct {
peers map[string]*Peer // consider to use [32]byte as key. The Peer(id string) would be faster peers map[string]*Peer // consider to use [32]byte as key. The Peer(id string) would be faster
peersLock sync.RWMutex peersLock sync.RWMutex
} }
// NewStore creates a new Store instance
func NewStore() *Store { func NewStore() *Store {
return &Store{ return &Store{
peers: make(map[string]*Peer), peers: make(map[string]*Peer),
} }
} }
// AddPeer adds a peer to the store
// It distinguishes the peers by their ID
func (s *Store) AddPeer(peer *Peer) { func (s *Store) AddPeer(peer *Peer) {
s.peersLock.Lock() s.peersLock.Lock()
defer s.peersLock.Unlock() defer s.peersLock.Unlock()
s.peers[peer.String()] = peer s.peers[peer.String()] = peer
} }
// DeletePeer deletes a peer from the store
func (s *Store) DeletePeer(peer *Peer) { func (s *Store) DeletePeer(peer *Peer) {
s.peersLock.Lock() s.peersLock.Lock()
defer s.peersLock.Unlock() defer s.peersLock.Unlock()
delete(s.peers, peer.String()) delete(s.peers, peer.String())
} }
// Peer returns a peer by its ID
func (s *Store) Peer(id string) (*Peer, bool) { func (s *Store) Peer(id string) (*Peer, bool) {
s.peersLock.RLock() s.peersLock.RLock()
defer s.peersLock.RUnlock() defer s.peersLock.RUnlock()
@ -35,6 +42,7 @@ func (s *Store) Peer(id string) (*Peer, bool) {
return p, ok return p, ok
} }
// Peers returns all the peers in the store
func (s *Store) Peers() []*Peer { func (s *Store) Peers() []*Peer {
s.peersLock.RLock() s.peersLock.RLock()
defer s.peersLock.RUnlock() defer s.peersLock.RUnlock()