netbird/relay/client/client.go

524 lines
12 KiB
Go
Raw Normal View History

2024-05-17 17:43:28 +02:00
package client
import (
2024-05-26 22:14:33 +02:00
"context"
2024-05-17 17:43:28 +02:00
"fmt"
"io"
"net"
"sync"
2024-05-17 17:43:28 +02:00
"time"
log "github.com/sirupsen/logrus"
2024-07-05 16:12:30 +02:00
auth "github.com/netbirdio/netbird/relay/auth/hmac"
2024-06-28 11:17:06 +02:00
"github.com/netbirdio/netbird/relay/client/dialer/ws"
2024-06-29 14:13:05 +02:00
"github.com/netbirdio/netbird/relay/healthcheck"
2024-05-17 17:43:28 +02:00
"github.com/netbirdio/netbird/relay/messages"
)
const (
bufferSize = 8820
serverResponseTimeout = 8 * time.Second
2024-05-17 17:43:28 +02:00
)
var (
ErrConnAlreadyExists = fmt.Errorf("connection already exists")
)
2024-06-29 14:13:05 +02:00
type internalStopFlag struct {
sync.Mutex
stop bool
}
func newInternalStopFlag() *internalStopFlag {
return &internalStopFlag{}
}
func (isf *internalStopFlag) set() {
isf.Lock()
defer isf.Unlock()
isf.stop = true
}
func (isf *internalStopFlag) isSet() bool {
isf.Lock()
defer isf.Unlock()
return isf.stop
}
// Msg carry the payload from the server to the client. With this struct, the net.Conn can free the buffer.
2024-05-23 13:24:02 +02:00
type Msg struct {
Payload []byte
bufPool *sync.Pool
bufPtr *[]byte
}
func (m *Msg) Free() {
m.bufPool.Put(m.bufPtr)
}
2024-05-17 17:43:28 +02:00
type connContainer struct {
conn *Conn
messages chan Msg
msgChanLock sync.Mutex
closed bool // flag to check if channel is closed
}
func newConnContainer(conn *Conn, messages chan Msg) *connContainer {
return &connContainer{
conn: conn,
messages: messages,
}
}
func (cc *connContainer) writeMsg(msg Msg) {
cc.msgChanLock.Lock()
defer cc.msgChanLock.Unlock()
if cc.closed {
return
}
cc.messages <- msg
}
func (cc *connContainer) close() {
cc.msgChanLock.Lock()
defer cc.msgChanLock.Unlock()
if cc.closed {
return
}
close(cc.messages)
cc.closed = true
2024-05-17 17:43:28 +02:00
}
2024-06-03 11:22:16 +02:00
// Client is a client for the relay server. It is responsible for establishing a connection to the relay server and
// managing connections to other peers. All exported functions are safe to call concurrently. After close the connection,
// the client can be reused by calling Connect again. When the client is closed, all connections are closed too.
2024-07-09 16:46:43 +02:00
// While the Connect is in progress, the OpenConn function will block until the connection is established with relay server.
2024-05-17 17:43:28 +02:00
type Client struct {
2024-07-08 17:01:11 +02:00
log *log.Entry
parentCtx context.Context
connectionURL string
authTokenStore *auth.TokenStore
hashedID []byte
2024-05-17 17:43:28 +02:00
bufPool *sync.Pool
2024-06-03 00:29:08 +02:00
relayConn net.Conn
2024-05-28 01:27:53 +02:00
conns map[string]*connContainer
2024-06-03 00:29:08 +02:00
serviceIsRunning bool
2024-06-29 14:13:05 +02:00
mu sync.Mutex // protect serviceIsRunning and conns
2024-06-03 00:29:08 +02:00
readLoopMutex sync.Mutex
wgReadLoop sync.WaitGroup
instanceURL *RelayAddr
2024-07-10 22:33:15 +02:00
muInstanceURL sync.Mutex
onDisconnectListener func()
listenerMutex sync.Mutex
2024-05-17 17:43:28 +02:00
}
2024-06-03 11:22:16 +02:00
// NewClient creates a new client for the relay server. The client is not connected to the server until the Connect
2024-07-08 17:01:11 +02:00
func NewClient(ctx context.Context, serverURL string, authTokenStore *auth.TokenStore, peerID string) *Client {
2024-05-23 13:24:02 +02:00
hashedID, hashedStringId := messages.HashID(peerID)
2024-05-17 17:43:28 +02:00
return &Client{
2024-07-08 17:01:11 +02:00
log: log.WithField("client_id", hashedStringId),
parentCtx: ctx,
connectionURL: serverURL,
authTokenStore: authTokenStore,
hashedID: hashedID,
bufPool: &sync.Pool{
New: func() any {
buf := make([]byte, bufferSize)
return &buf
},
},
conns: make(map[string]*connContainer),
2024-05-17 17:43:28 +02:00
}
}
2024-06-03 11:22:16 +02:00
// Connect establishes a connection to the relay server. It blocks until the connection is established or an error occurs.
func (c *Client) Connect() error {
2024-07-02 11:57:17 +02:00
c.log.Infof("connecting to relay server: %s", c.connectionURL)
2024-06-03 00:29:08 +02:00
c.readLoopMutex.Lock()
defer c.readLoopMutex.Unlock()
c.mu.Lock()
2024-06-03 20:14:39 +02:00
defer c.mu.Unlock()
2024-05-29 16:40:26 +02:00
if c.serviceIsRunning {
return nil
}
2024-05-28 01:00:25 +02:00
2024-05-29 16:40:26 +02:00
err := c.connect()
if err != nil {
return err
}
2024-05-28 01:00:25 +02:00
2024-05-29 16:40:26 +02:00
c.serviceIsRunning = true
2024-05-28 01:00:25 +02:00
2024-06-03 00:29:08 +02:00
c.wgReadLoop.Add(1)
go c.readLoop(c.relayConn)
2024-05-29 16:40:26 +02:00
2024-07-02 11:57:17 +02:00
log.Infof("relay connection established with: %s", c.connectionURL)
2024-05-29 16:40:26 +02:00
return nil
2024-05-28 01:00:25 +02:00
}
2024-06-12 10:56:21 +02:00
// OpenConn create a new net.Conn for the destination peer ID. In case if the connection is in progress
// to the relay server, the function will block until the connection is established or timed out. Otherwise,
// it will return immediately.
// todo: what should happen if call with the same peerID with multiple times?
2024-05-23 13:24:02 +02:00
func (c *Client) OpenConn(dstPeerID string) (net.Conn, error) {
2024-06-03 00:29:08 +02:00
c.mu.Lock()
defer c.mu.Unlock()
2024-05-28 01:00:25 +02:00
2024-06-03 00:29:08 +02:00
if !c.serviceIsRunning {
2024-05-26 22:14:33 +02:00
return nil, fmt.Errorf("relay connection is not established")
}
2024-05-23 13:24:02 +02:00
hashedID, hashedStringID := messages.HashID(dstPeerID)
_, ok := c.conns[hashedStringID]
if ok {
return nil, ErrConnAlreadyExists
}
2024-05-23 13:24:02 +02:00
log.Infof("open connection to peer: %s", hashedStringID)
msgChannel := make(chan Msg, 2)
conn := NewConn(c, hashedID, hashedStringID, msgChannel, c.instanceURL)
c.conns[hashedStringID] = newConnContainer(conn, msgChannel)
2024-05-23 13:24:02 +02:00
return conn, nil
2024-05-17 17:43:28 +02:00
}
2024-07-02 11:57:17 +02:00
// ServerInstanceURL returns the address of the relay server. It could change after the close and reopen the connection.
func (c *Client) ServerInstanceURL() (string, error) {
2024-07-10 22:33:15 +02:00
c.muInstanceURL.Lock()
defer c.muInstanceURL.Unlock()
if c.instanceURL == nil {
2024-07-02 11:57:17 +02:00
return "", fmt.Errorf("relay connection is not established")
2024-05-29 16:40:26 +02:00
}
return c.instanceURL.String(), nil
2024-05-29 16:40:26 +02:00
}
// SetOnDisconnectListener sets a function that will be called when the connection to the relay server is closed.
func (c *Client) SetOnDisconnectListener(fn func()) {
c.listenerMutex.Lock()
defer c.listenerMutex.Unlock()
c.onDisconnectListener = fn
}
2024-07-09 16:44:12 +02:00
// HasConns returns true if there are connections.
func (c *Client) HasConns() bool {
c.mu.Lock()
defer c.mu.Unlock()
return len(c.conns) > 0
}
2024-06-03 11:22:16 +02:00
// Close closes the connection to the relay server and all connections to other peers.
2024-05-17 17:43:28 +02:00
func (c *Client) Close() error {
2024-06-29 14:13:05 +02:00
return c.close(true)
2024-06-05 19:49:30 +02:00
}
2024-05-28 01:00:25 +02:00
func (c *Client) connect() error {
2024-07-02 11:57:17 +02:00
conn, err := ws.Dial(c.connectionURL)
2024-05-28 01:00:25 +02:00
if err != nil {
return err
}
c.relayConn = conn
err = c.handShake()
if err != nil {
cErr := conn.Close()
if cErr != nil {
log.Errorf("failed to close connection: %s", cErr)
}
c.relayConn = nil
return err
}
return nil
}
2024-05-17 17:43:28 +02:00
func (c *Client) handShake() error {
2024-07-10 13:21:50 +02:00
tb := c.authTokenStore.TokenBinary()
2024-07-05 16:12:30 +02:00
2024-07-10 13:21:50 +02:00
msg, err := messages.MarshalHelloMsg(c.hashedID, tb)
2024-05-17 17:43:28 +02:00
if err != nil {
2024-05-23 13:24:02 +02:00
log.Errorf("failed to marshal hello message: %s", err)
2024-05-17 17:43:28 +02:00
return err
}
_, err = c.relayConn.Write(msg)
if err != nil {
log.Errorf("failed to send hello message: %s", err)
return err
}
2024-06-26 16:22:26 +02:00
buf := make([]byte, messages.MaxHandshakeSize)
n, err := c.readWithTimeout(buf)
if err != nil {
log.Errorf("failed to read hello response: %s", err)
return err
}
msgType, err := messages.DetermineServerMsgType(buf[:n])
if err != nil {
log.Errorf("failed to determine message type: %s", err)
return err
}
if msgType != messages.MsgTypeHelloResponse {
log.Errorf("unexpected message type: %s", msgType)
return fmt.Errorf("unexpected message type")
}
2024-07-02 11:57:17 +02:00
2024-07-05 16:12:30 +02:00
ia, err := messages.UnmarshalHelloResponse(buf[:n])
2024-07-02 11:57:17 +02:00
if err != nil {
return err
}
2024-07-10 22:33:15 +02:00
c.muInstanceURL.Lock()
c.instanceURL = &RelayAddr{addr: ia}
2024-07-10 22:33:15 +02:00
c.muInstanceURL.Unlock()
2024-05-17 17:43:28 +02:00
return nil
}
2024-06-03 00:29:08 +02:00
func (c *Client) readLoop(relayConn net.Conn) {
2024-06-29 14:13:05 +02:00
internallyStoppedFlag := newInternalStopFlag()
hc := healthcheck.NewReceiver()
go c.listenForStopEvents(hc, relayConn, internallyStoppedFlag)
2024-06-05 19:49:30 +02:00
var (
2024-06-29 14:13:05 +02:00
errExit error
n int
2024-06-05 19:49:30 +02:00
)
2024-05-17 17:43:28 +02:00
for {
bufPtr := c.bufPool.Get().(*[]byte)
buf := *bufPtr
2024-06-03 00:29:08 +02:00
n, errExit = relayConn.Read(buf)
2024-05-17 17:43:28 +02:00
if errExit != nil {
2024-06-03 00:29:08 +02:00
c.mu.Lock()
2024-06-29 14:13:05 +02:00
if c.serviceIsRunning && !internallyStoppedFlag.isSet() {
2024-05-23 13:24:02 +02:00
c.log.Debugf("failed to read message from relay server: %s", errExit)
}
2024-06-03 00:29:08 +02:00
c.mu.Unlock()
2024-07-09 16:15:25 +02:00
break
2024-05-17 17:43:28 +02:00
}
msgType, err := messages.DetermineServerMsgType(buf[:n])
if err != nil {
2024-05-23 13:24:02 +02:00
c.log.Errorf("failed to determine message type: %s", err)
2024-05-17 17:43:28 +02:00
continue
}
2024-07-09 16:15:25 +02:00
if !c.handleMsg(msgType, buf[:n], bufPtr, hc, internallyStoppedFlag) {
break
2024-05-17 17:43:28 +02:00
}
}
2024-06-29 14:13:05 +02:00
hc.Stop()
2024-07-10 22:33:15 +02:00
c.muInstanceURL.Lock()
c.instanceURL = nil
2024-07-10 22:33:15 +02:00
c.muInstanceURL.Unlock()
c.notifyDisconnected()
2024-05-28 01:27:53 +02:00
c.wgReadLoop.Done()
2024-06-29 14:13:05 +02:00
_ = c.close(false)
2024-05-17 17:43:28 +02:00
}
2024-07-09 16:15:25 +02:00
func (c *Client) handleMsg(msgType messages.MsgType, buf []byte, bufPtr *[]byte, hc *healthcheck.Receiver, internallyStoppedFlag *internalStopFlag) (continueLoop bool) {
switch msgType {
case messages.MsgTypeHealthCheck:
2024-07-09 16:38:50 +02:00
c.handleHealthCheck(hc, internallyStoppedFlag)
c.bufPool.Put(bufPtr)
2024-07-09 16:15:25 +02:00
case messages.MsgTypeTransport:
2024-07-09 16:38:50 +02:00
return c.handleTransportMsg(buf, bufPtr, internallyStoppedFlag)
2024-07-09 16:15:25 +02:00
case messages.MsgTypeClose:
log.Debugf("relay connection close by server")
c.bufPool.Put(bufPtr)
return false
}
return true
}
2024-07-09 16:38:50 +02:00
func (c *Client) handleHealthCheck(hc *healthcheck.Receiver, internallyStoppedFlag *internalStopFlag) {
2024-07-09 16:27:20 +02:00
msg := messages.MarshalHealthcheck()
_, wErr := c.relayConn.Write(msg)
if wErr != nil {
if c.serviceIsRunning && !internallyStoppedFlag.isSet() {
c.log.Errorf("failed to send heartbeat: %s", wErr)
}
}
hc.Heartbeat()
}
2024-07-09 16:38:50 +02:00
func (c *Client) handleTransportMsg(buf []byte, bufPtr *[]byte, internallyStoppedFlag *internalStopFlag) bool {
2024-07-09 16:15:25 +02:00
peerID, payload, err := messages.UnmarshalTransportMsg(buf)
if err != nil {
if c.serviceIsRunning && !internallyStoppedFlag.isSet() {
c.log.Errorf("failed to parse transport message: %v", err)
}
c.bufPool.Put(bufPtr)
return true
}
stringID := messages.HashIDToString(peerID)
c.mu.Lock()
if !c.serviceIsRunning {
c.mu.Unlock()
c.bufPool.Put(bufPtr)
return false
}
container, ok := c.conns[stringID]
c.mu.Unlock()
if !ok {
c.log.Errorf("peer not found: %s", stringID)
c.bufPool.Put(bufPtr)
return true
}
msg := Msg{
bufPool: c.bufPool,
bufPtr: bufPtr,
Payload: payload,
}
container.writeMsg(msg)
return true
}
2024-07-18 13:16:50 +02:00
func (c *Client) writeTo(connReference *Conn, id string, dstID []byte, payload []byte) (int, error) {
2024-06-03 00:29:08 +02:00
c.mu.Lock()
2024-07-18 13:16:50 +02:00
conn, ok := c.conns[id]
2024-06-03 00:29:08 +02:00
c.mu.Unlock()
2024-05-27 10:25:08 +02:00
if !ok {
return 0, io.EOF
}
2024-07-18 13:16:50 +02:00
if conn.conn != connReference {
return 0, io.EOF
}
2024-06-26 15:26:19 +02:00
// todo: use buffer pool instead of create new transport msg.
msg, err := messages.MarshalTransportMsg(dstID, payload)
if err != nil {
log.Errorf("failed to marshal transport message: %s", err)
return 0, err
}
2024-07-22 13:13:12 +02:00
// the write always return with 0 length because the underling does not support the size feedback.
_, err = c.relayConn.Write(msg)
2024-05-17 17:43:28 +02:00
if err != nil {
log.Errorf("failed to write transport message: %s", err)
}
2024-07-22 13:13:12 +02:00
return len(payload), err
2024-05-17 17:43:28 +02:00
}
2024-06-29 14:13:05 +02:00
func (c *Client) listenForStopEvents(hc *healthcheck.Receiver, conn net.Conn, internalStopFlag *internalStopFlag) {
for {
select {
case _, ok := <-hc.OnTimeout:
if !ok {
return
}
c.log.Errorf("health check timeout")
internalStopFlag.set()
_ = conn.Close() // ignore the err because the readLoop will handle it
return
case <-c.parentCtx.Done():
err := c.close(true)
if err != nil {
log.Errorf("failed to teardown connection: %s", err)
}
return
}
}
}
2024-06-03 00:29:08 +02:00
func (c *Client) closeAllConns() {
for _, container := range c.conns {
container.close()
2024-06-03 00:29:08 +02:00
}
c.conns = make(map[string]*connContainer)
}
2024-07-18 13:16:50 +02:00
func (c *Client) closeConn(connReference *Conn, id string) error {
2024-06-03 00:29:08 +02:00
c.mu.Lock()
defer c.mu.Unlock()
2024-05-28 01:00:25 +02:00
container, ok := c.conns[id]
2024-05-27 10:25:08 +02:00
if !ok {
return fmt.Errorf("connection already closed")
}
2024-07-18 13:16:50 +02:00
if container.conn != connReference {
return fmt.Errorf("conn reference mismatch")
}
container.close()
2024-05-27 10:25:08 +02:00
delete(c.conns, id)
return nil
}
2024-06-29 14:13:05 +02:00
func (c *Client) close(gracefullyExit bool) error {
2024-06-27 18:42:40 +02:00
c.readLoopMutex.Lock()
defer c.readLoopMutex.Unlock()
c.mu.Lock()
var err error
if !c.serviceIsRunning {
c.mu.Unlock()
return nil
}
c.serviceIsRunning = false
c.closeAllConns()
2024-06-29 14:13:05 +02:00
if gracefullyExit {
2024-06-27 18:42:40 +02:00
c.writeCloseMsg()
err = c.relayConn.Close()
}
c.mu.Unlock()
c.wgReadLoop.Wait()
2024-07-02 11:57:17 +02:00
c.log.Infof("relay connection closed with: %s", c.connectionURL)
2024-06-27 18:42:40 +02:00
return err
}
func (c *Client) notifyDisconnected() {
c.listenerMutex.Lock()
defer c.listenerMutex.Unlock()
if c.onDisconnectListener == nil {
return
}
go c.onDisconnectListener()
}
2024-06-05 19:49:30 +02:00
func (c *Client) writeCloseMsg() {
msg := messages.MarshalCloseMsg()
_, err := c.relayConn.Write(msg)
if err != nil {
c.log.Errorf("failed to send close message: %s", err)
}
}
2024-06-27 18:42:40 +02:00
func (c *Client) readWithTimeout(buf []byte) (int, error) {
ctx, cancel := context.WithTimeout(c.parentCtx, serverResponseTimeout)
defer cancel()
readDone := make(chan struct{})
var (
n int
err error
)
go func() {
n, err = c.relayConn.Read(buf)
close(readDone)
}()
select {
case <-ctx.Done():
return 0, fmt.Errorf("read operation timed out")
case <-readDone:
return n, err
}
}