netbird/relay/client/client.go

343 lines
6.8 KiB
Go
Raw Normal View History

2024-05-17 17:43:28 +02:00
package client
import (
2024-05-26 22:14:33 +02:00
"context"
2024-05-17 17:43:28 +02:00
"fmt"
2024-05-28 01:00:25 +02:00
"github.com/netbirdio/netbird/relay/client/dialer/udp"
2024-05-17 17:43:28 +02:00
"io"
"net"
"sync"
2024-05-17 17:43:28 +02:00
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/relay/messages"
)
const (
bufferSize = 1500 // optimise the buffer size
serverResponseTimeout = 8 * time.Second
2024-05-17 17:43:28 +02:00
)
2024-05-28 01:00:25 +02:00
var (
reconnectingTimeout = 5 * time.Second
)
2024-05-23 13:24:02 +02:00
type Msg struct {
buf []byte
}
2024-05-17 17:43:28 +02:00
type connContainer struct {
conn *Conn
2024-05-23 13:24:02 +02:00
messages chan Msg
2024-05-17 17:43:28 +02:00
}
type Client struct {
2024-05-23 13:24:02 +02:00
log *log.Entry
2024-05-26 22:14:33 +02:00
ctx context.Context
ctxCancel context.CancelFunc
2024-05-17 17:43:28 +02:00
serverAddress string
2024-05-23 13:24:02 +02:00
hashedID []byte
2024-05-17 17:43:28 +02:00
2024-05-28 01:27:53 +02:00
readyToOpenConns bool
conns map[string]*connContainer
connsMutext sync.Mutex // protect conns and readyToOpenConns bool
relayConn net.Conn
serviceIsRunning bool
serviceIsRunningMutex sync.Mutex
wgReadLoop sync.WaitGroup
onDisconnected chan struct{}
2024-05-17 17:43:28 +02:00
}
2024-05-26 22:14:33 +02:00
func NewClient(ctx context.Context, serverAddress, peerID string) *Client {
ctx, ctxCancel := context.WithCancel(ctx)
2024-05-23 13:24:02 +02:00
hashedID, hashedStringId := messages.HashID(peerID)
2024-05-17 17:43:28 +02:00
return &Client{
2024-05-28 01:00:25 +02:00
log: log.WithField("client_id", hashedStringId),
ctx: ctx,
ctxCancel: ctxCancel,
serverAddress: serverAddress,
hashedID: hashedID,
conns: make(map[string]*connContainer),
onDisconnected: make(chan struct{}),
2024-05-17 17:43:28 +02:00
}
}
func (c *Client) Connect() error {
2024-05-28 01:27:53 +02:00
c.serviceIsRunningMutex.Lock()
2024-05-28 01:00:25 +02:00
if c.serviceIsRunning {
2024-05-28 01:27:53 +02:00
c.serviceIsRunningMutex.Unlock()
2024-05-26 22:14:33 +02:00
return nil
}
2024-05-28 01:00:25 +02:00
err := c.connect()
2024-05-17 17:43:28 +02:00
if err != nil {
2024-05-28 01:27:53 +02:00
c.serviceIsRunningMutex.Unlock()
2024-05-17 17:43:28 +02:00
return err
}
2024-05-28 01:00:25 +02:00
c.serviceIsRunning = true
2024-05-26 22:14:33 +02:00
2024-05-28 01:27:53 +02:00
c.wgReadLoop.Add(1)
2024-05-27 10:25:08 +02:00
go c.readLoop()
2024-05-28 01:27:53 +02:00
c.serviceIsRunningMutex.Unlock()
2024-05-28 01:00:25 +02:00
2024-05-26 22:14:33 +02:00
go func() {
<-c.ctx.Done()
cErr := c.close()
if cErr != nil {
log.Errorf("failed to close relay connection: %s", cErr)
}
}()
2024-05-28 01:00:25 +02:00
go c.reconnectGuard()
2024-05-17 17:43:28 +02:00
return nil
}
2024-05-28 01:00:25 +02:00
func (c *Client) reconnectGuard() {
for {
2024-05-28 01:27:53 +02:00
c.wgReadLoop.Wait()
2024-05-28 01:00:25 +02:00
2024-05-28 01:27:53 +02:00
c.serviceIsRunningMutex.Lock()
2024-05-28 01:00:25 +02:00
if !c.serviceIsRunning {
2024-05-28 01:27:53 +02:00
c.serviceIsRunningMutex.Unlock()
2024-05-28 01:00:25 +02:00
return
}
log.Infof("reconnecting to relay server")
err := c.connect()
if err != nil {
log.Errorf("failed to reconnect to relay server: %s", err)
2024-05-28 01:27:53 +02:00
c.serviceIsRunningMutex.Unlock()
2024-05-28 01:00:25 +02:00
time.Sleep(reconnectingTimeout)
continue
}
log.Infof("reconnected to relay server")
2024-05-28 01:27:53 +02:00
c.wgReadLoop.Add(1)
2024-05-28 01:00:25 +02:00
go c.readLoop()
2024-05-28 01:27:53 +02:00
c.serviceIsRunningMutex.Unlock()
2024-05-28 01:00:25 +02:00
}
}
2024-05-23 13:24:02 +02:00
func (c *Client) OpenConn(dstPeerID string) (net.Conn, error) {
2024-05-28 01:00:25 +02:00
c.connsMutext.Lock()
defer c.connsMutext.Unlock()
2024-05-28 01:27:53 +02:00
if !c.readyToOpenConns {
2024-05-26 22:14:33 +02:00
return nil, fmt.Errorf("relay connection is not established")
}
2024-05-23 13:24:02 +02:00
hashedID, hashedStringID := messages.HashID(dstPeerID)
log.Infof("open connection to peer: %s", hashedStringID)
messageBuffer := make(chan Msg, 2)
2024-05-27 10:25:08 +02:00
conn := NewConn(c, hashedID, hashedStringID, c.generateConnReaderFN(messageBuffer))
2024-05-23 13:24:02 +02:00
c.conns[hashedStringID] = &connContainer{
conn,
messageBuffer,
2024-05-17 17:43:28 +02:00
}
2024-05-23 13:24:02 +02:00
return conn, nil
2024-05-17 17:43:28 +02:00
}
func (c *Client) Close() error {
2024-05-28 01:27:53 +02:00
c.serviceIsRunningMutex.Lock()
if !c.serviceIsRunning {
c.serviceIsRunningMutex.Unlock()
return nil
}
2024-05-26 22:14:33 +02:00
c.ctxCancel()
return c.close()
}
2024-05-28 01:00:25 +02:00
func (c *Client) connect() error {
conn, err := udp.Dial(c.serverAddress)
if err != nil {
return err
}
c.relayConn = conn
err = c.handShake()
if err != nil {
cErr := conn.Close()
if cErr != nil {
log.Errorf("failed to close connection: %s", cErr)
}
c.relayConn = nil
return err
}
2024-05-28 01:27:53 +02:00
c.readyToOpenConns = true
2024-05-28 01:00:25 +02:00
return nil
}
2024-05-26 22:14:33 +02:00
func (c *Client) close() error {
2024-05-28 01:27:53 +02:00
c.serviceIsRunningMutex.Lock()
defer c.serviceIsRunningMutex.Unlock()
2024-05-28 01:00:25 +02:00
if !c.serviceIsRunning {
return nil
}
2024-05-28 01:00:25 +02:00
c.serviceIsRunning = false
2024-05-26 22:14:33 +02:00
2024-05-17 17:43:28 +02:00
err := c.relayConn.Close()
2024-05-26 22:14:33 +02:00
2024-05-28 01:27:53 +02:00
c.wgReadLoop.Wait()
2024-05-27 10:25:08 +02:00
2024-05-17 17:43:28 +02:00
return err
}
func (c *Client) handShake() error {
2024-05-26 22:14:33 +02:00
defer func() {
err := c.relayConn.SetReadDeadline(time.Time{})
if err != nil {
log.Errorf("failed to reset read deadline: %s", err)
}
}()
2024-05-23 13:24:02 +02:00
msg, err := messages.MarshalHelloMsg(c.hashedID)
2024-05-17 17:43:28 +02:00
if err != nil {
2024-05-23 13:24:02 +02:00
log.Errorf("failed to marshal hello message: %s", err)
2024-05-17 17:43:28 +02:00
return err
}
_, err = c.relayConn.Write(msg)
if err != nil {
log.Errorf("failed to send hello message: %s", err)
return err
}
err = c.relayConn.SetReadDeadline(time.Now().Add(serverResponseTimeout))
if err != nil {
log.Errorf("failed to set read deadline: %s", err)
return err
}
buf := make([]byte, 1500) // todo: optimise buffer size
n, err := c.relayConn.Read(buf)
if err != nil {
log.Errorf("failed to read hello response: %s", err)
return err
}
msgType, err := messages.DetermineServerMsgType(buf[:n])
if err != nil {
log.Errorf("failed to determine message type: %s", err)
return err
}
if msgType != messages.MsgTypeHelloResponse {
log.Errorf("unexpected message type: %s", msgType)
return fmt.Errorf("unexpected message type")
}
2024-05-17 17:43:28 +02:00
return nil
}
func (c *Client) readLoop() {
var errExit error
var n int
for {
2024-05-23 13:24:02 +02:00
buf := make([]byte, bufferSize)
2024-05-17 17:43:28 +02:00
n, errExit = c.relayConn.Read(buf)
if errExit != nil {
2024-05-28 01:00:25 +02:00
if c.serviceIsRunning {
2024-05-23 13:24:02 +02:00
c.log.Debugf("failed to read message from relay server: %s", errExit)
}
2024-05-17 17:43:28 +02:00
break
}
msgType, err := messages.DetermineServerMsgType(buf[:n])
if err != nil {
2024-05-23 13:24:02 +02:00
c.log.Errorf("failed to determine message type: %s", err)
2024-05-17 17:43:28 +02:00
continue
}
switch msgType {
case messages.MsgTypeTransport:
2024-05-23 13:24:02 +02:00
peerID, err := messages.UnmarshalTransportID(buf[:n])
2024-05-17 17:43:28 +02:00
if err != nil {
2024-05-23 13:24:02 +02:00
c.log.Errorf("failed to parse transport message: %v", err)
2024-05-17 17:43:28 +02:00
continue
}
2024-05-23 13:24:02 +02:00
stringID := messages.HashIDToString(peerID)
container, ok := c.conns[stringID]
2024-05-21 16:21:29 +02:00
if !ok {
2024-05-23 13:24:02 +02:00
c.log.Errorf("peer not found: %s", stringID)
continue
2024-05-21 16:21:29 +02:00
}
2024-05-23 13:24:02 +02:00
container.messages <- Msg{
buf[:n],
}
2024-05-17 17:43:28 +02:00
}
}
2024-05-28 01:00:25 +02:00
if c.serviceIsRunning {
2024-05-17 17:43:28 +02:00
_ = c.relayConn.Close()
}
2024-05-28 01:00:25 +02:00
c.connsMutext.Lock()
2024-05-28 01:27:53 +02:00
c.readyToOpenConns = false
2024-05-28 01:00:25 +02:00
for _, container := range c.conns {
close(container.messages)
}
c.conns = make(map[string]*connContainer)
c.connsMutext.Unlock()
c.log.Tracef("exit from read loop")
2024-05-28 01:27:53 +02:00
c.wgReadLoop.Done()
2024-05-17 17:43:28 +02:00
}
2024-05-27 10:25:08 +02:00
func (c *Client) writeTo(id string, dstID []byte, payload []byte) (int, error) {
2024-05-28 01:27:53 +02:00
c.connsMutext.Lock()
2024-05-27 10:25:08 +02:00
_, ok := c.conns[id]
if !ok {
2024-05-28 01:27:53 +02:00
c.connsMutext.Unlock()
2024-05-27 10:25:08 +02:00
return 0, io.EOF
}
2024-05-28 01:27:53 +02:00
c.connsMutext.Unlock()
2024-05-23 13:24:02 +02:00
msg := messages.MarshalTransportMsg(dstID, payload)
2024-05-17 17:43:28 +02:00
n, err := c.relayConn.Write(msg)
if err != nil {
log.Errorf("failed to write transport message: %s", err)
}
return n, err
}
2024-05-23 13:24:02 +02:00
func (c *Client) generateConnReaderFN(msgChannel chan Msg) func(b []byte) (n int, err error) {
2024-05-17 17:43:28 +02:00
return func(b []byte) (n int, err error) {
2024-05-23 13:24:02 +02:00
msg, ok := <-msgChannel
if !ok {
return 0, io.EOF
}
2024-05-17 23:29:47 +02:00
2024-05-23 13:24:02 +02:00
payload, err := messages.UnmarshalTransportPayload(msg.buf)
if err != nil {
return 0, err
2024-05-17 17:43:28 +02:00
}
2024-05-23 13:24:02 +02:00
n = copy(b, payload)
2024-05-17 17:43:28 +02:00
return n, nil
}
}
2024-05-27 10:25:08 +02:00
func (c *Client) closeConn(id string) error {
2024-05-28 01:00:25 +02:00
c.connsMutext.Lock()
defer c.connsMutext.Unlock()
2024-05-27 10:25:08 +02:00
conn, ok := c.conns[id]
if !ok {
return fmt.Errorf("connection already closed")
}
close(conn.messages)
delete(c.conns, id)
return nil
}