From a40d4d2f325b56ba92a99c56b3235ca1b3311774 Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Tue, 4 Jun 2024 14:40:35 +0200 Subject: [PATCH] - add comments - avoid double closing messages - add cleanup routine for relay manager --- relay/client/client.go | 35 +++++---- relay/client/manager.go | 100 +++++++++++++++++++++---- relay/client/manager_test.go | 74 +++++++++++++++--- relay/server/listener/wsnhooyr/conn.go | 1 + relay/server/server.go | 1 - 5 files changed, 172 insertions(+), 39 deletions(-) diff --git a/relay/client/client.go b/relay/client/client.go index 64b20f9e1..fa4f0da6a 100644 --- a/relay/client/client.go +++ b/relay/client/client.go @@ -35,7 +35,6 @@ type connContainer struct { type Client struct { log *log.Entry parentCtx context.Context - ctx context.Context ctxCancel context.CancelFunc serverAddress string hashedID []byte @@ -66,13 +65,6 @@ func NewClient(ctx context.Context, serverAddress, peerID string) *Client { } } -// SetOnDisconnectListener sets a function that will be called when the connection to the relay server is closed. -func (c *Client) SetOnDisconnectListener(fn func()) { - c.listenerMutex.Lock() - defer c.listenerMutex.Unlock() - c.onDisconnectListener = fn -} - // Connect establishes a connection to the relay server. It blocks until the connection is established or an error occurs. func (c *Client) Connect() error { c.readLoopMutex.Lock() @@ -92,8 +84,9 @@ func (c *Client) Connect() error { c.serviceIsRunning = true - c.ctx, c.ctxCancel = context.WithCancel(c.parentCtx) - context.AfterFunc(c.ctx, func() { + var ctx context.Context + ctx, c.ctxCancel = context.WithCancel(c.parentCtx) + context.AfterFunc(ctx, func() { cErr := c.Close() if cErr != nil { log.Errorf("failed to close relay connection: %s", cErr) @@ -136,6 +129,19 @@ func (c *Client) RelayRemoteAddress() (net.Addr, error) { return c.remoteAddr, nil } +// SetOnDisconnectListener sets a function that will be called when the connection to the relay server is closed. +func (c *Client) SetOnDisconnectListener(fn func()) { + c.listenerMutex.Lock() + defer c.listenerMutex.Unlock() + c.onDisconnectListener = fn +} + +func (c *Client) HasConns() bool { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.conns) > 0 +} + // Close closes the connection to the relay server and all connections to other peers. func (c *Client) Close() error { c.readLoopMutex.Lock() @@ -143,14 +149,17 @@ func (c *Client) Close() error { c.mu.Lock() var err error - if c.serviceIsRunning { - c.serviceIsRunning = false - err = c.relayConn.Close() + if !c.serviceIsRunning { + return nil } + + c.serviceIsRunning = false + err = c.relayConn.Close() c.closeAllConns() c.mu.Unlock() c.wgReadLoop.Wait() + c.log.Infof("relay client ha been closed: %s", c.serverAddress) c.ctxCancel() return err } diff --git a/relay/client/manager.go b/relay/client/manager.go index f8d5a28fc..98ec3a1c8 100644 --- a/relay/client/manager.go +++ b/relay/client/manager.go @@ -5,10 +5,18 @@ import ( "fmt" "net" "sync" + "time" log "github.com/sirupsen/logrus" ) +var ( + relayCleanupInterval = 60 * time.Second +) + +// RelayTrack hold the relay clients for the foregin relay servers. +// With the mutex can ensure we can open new connection in case the relay connection has been established with +// the relay server. type RelayTrack struct { sync.RWMutex relayClient *Client @@ -18,6 +26,12 @@ func NewRelayTrack() *RelayTrack { return &RelayTrack{} } +// Manager is a manager for the relay client. It establish one persistent connection to the given relay server. In case +// of network error the manager will try to reconnect to the server. +// The manager also manage temproary relay connection. If a client wants to communicate with an another client on a +// different relay server, the manager will establish a new connection to the relay server. The connection with these +// relay servers will be closed if there is no active connection. Periodically the manager will check if there is any +// unused relay connection and close it. type Manager struct { ctx context.Context srvAddress string @@ -39,18 +53,26 @@ func NewManager(ctx context.Context, serverAddress string, peerID string) *Manag } } -func (m *Manager) Serve() error { +// Serve starts the manager. It will establish a connection to the relay server and start the relay cleanup loop. +// todo: consider to return an error if the initial connection to the relay server is not established. +func (m *Manager) Serve() { m.relayClient = NewClient(m.ctx, m.srvAddress, m.peerID) m.reconnectGuard = NewGuard(m.ctx, m.relayClient) m.relayClient.SetOnDisconnectListener(m.reconnectGuard.OnDisconnected) err := m.relayClient.Connect() if err != nil { - return err + log.Errorf("failed to connect to relay server, keep try to reconnect: %s", err) + return } - return nil + m.startCleanupLoop() + + return } +// OpenConn opens a connection to the given peer key. If the peer is on the same relay server, the connection will be +// established via the relay server. If the peer is on a different relay server, the manager will establish a new +// connection to the relay server. func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) { if m.relayClient == nil { return nil, fmt.Errorf("relay client not connected") @@ -68,6 +90,8 @@ func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) { } } +// RelayAddress returns the address of the permanent relay server. It could change if the network connection is lost. +// This address will be sent to the target peer to choose the common relay server for the communication. func (m *Manager) RelayAddress() (net.Addr, error) { if m.relayClient == nil { return nil, fmt.Errorf("relay client not connected") @@ -76,20 +100,31 @@ func (m *Manager) RelayAddress() (net.Addr, error) { } func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) { + // check if already has a connection to the desired relay server m.relayClientsMutex.RLock() - relayTrack, ok := m.relayClients[serverAddress] + rt, ok := m.relayClients[serverAddress] if ok { - relayTrack.RLock() + rt.RLock() m.relayClientsMutex.RUnlock() - defer relayTrack.RUnlock() - return relayTrack.relayClient.OpenConn(peerKey) + defer rt.RUnlock() + return rt.relayClient.OpenConn(peerKey) } m.relayClientsMutex.RUnlock() - rt := NewRelayTrack() - rt.Lock() - + // if not, establish a new connection but check it again (because changed the lock type) before starting the + // connection m.relayClientsMutex.Lock() + rt, ok = m.relayClients[serverAddress] + if ok { + rt.RLock() + m.relayClientsMutex.Unlock() + defer rt.RUnlock() + return rt.relayClient.OpenConn(peerKey) + } + + // create a new relay client and store it in the relayClients map + rt = NewRelayTrack() + rt.Lock() m.relayClients[serverAddress] = rt m.relayClientsMutex.Unlock() @@ -102,25 +137,25 @@ func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) { m.relayClientsMutex.Unlock() return nil, err } + // if connection closed then delete the relay client from the list relayClient.SetOnDisconnectListener(func() { m.deleteRelayConn(serverAddress) }) + rt.relayClient = relayClient rt.Unlock() conn, err := relayClient.OpenConn(peerKey) if err != nil { return nil, err } - return conn, nil } func (m *Manager) deleteRelayConn(address string) { log.Infof("deleting relay client for %s", address) m.relayClientsMutex.Lock() - defer m.relayClientsMutex.Unlock() - delete(m.relayClients, address) + m.relayClientsMutex.Unlock() } func (m *Manager) isForeignServer(address string) (bool, error) { @@ -130,3 +165,42 @@ func (m *Manager) isForeignServer(address string) (bool, error) { } return rAddr.String() != address, nil } + +func (m *Manager) startCleanupLoop() { + if m.ctx.Err() != nil { + return + } + + ticker := time.NewTicker(relayCleanupInterval) + go func() { + defer ticker.Stop() + for { + select { + case <-m.ctx.Done(): + return + case <-ticker.C: + m.cleanUpUnusedRelays() + } + } + }() +} + +func (m *Manager) cleanUpUnusedRelays() { + m.relayClientsMutex.Lock() + defer m.relayClientsMutex.Unlock() + + for addr, rt := range m.relayClients { + rt.Lock() + if rt.relayClient.HasConns() { + rt.Unlock() + continue + } + rt.relayClient.SetOnDisconnectListener(nil) + go func() { + _ = rt.relayClient.Close() + }() + log.Debugf("clean up relay client: %s", addr) + delete(m.relayClients, addr) + rt.Unlock() + } +} diff --git a/relay/client/manager_test.go b/relay/client/manager_test.go index 0fbdddd7e..d0e22dabd 100644 --- a/relay/client/manager_test.go +++ b/relay/client/manager_test.go @@ -3,6 +3,7 @@ package client import ( "context" "testing" + "time" log "github.com/sirupsen/logrus" @@ -47,18 +48,13 @@ func TestForeignConn(t *testing.T) { idAlice := "alice" log.Debugf("connect by alice") clientAlice := NewManager(ctx, addr1, idAlice) - err := clientAlice.Serve() - if err != nil { - t.Fatalf("failed to connect to server: %s", err) - } + clientAlice.Serve() idBob := "bob" log.Debugf("connect by bob") clientBob := NewManager(ctx, addr2, idBob) - err = clientBob.Serve() - if err != nil { - t.Fatalf("failed to connect to server: %s", err) - } + clientBob.Serve() + bobsSrvAddr, err := clientBob.RelayAddress() if err != nil { t.Fatalf("failed to get relay address: %s", err) @@ -137,10 +133,7 @@ func TestForeginConnClose(t *testing.T) { idAlice := "alice" log.Debugf("connect by alice") clientAlice := NewManager(ctx, addr1, idAlice) - err := clientAlice.Serve() - if err != nil { - t.Fatalf("failed to connect to server: %s", err) - } + clientAlice.Serve() conn, err := clientAlice.OpenConn(addr2, "anotherpeer") if err != nil { @@ -154,3 +147,60 @@ func TestForeginConnClose(t *testing.T) { select {} } + +func TestForeginAutoClose(t *testing.T) { + ctx := context.Background() + + addr1 := "localhost:1234" + srv1 := server.NewServer() + go func() { + err := srv1.Listen(addr1) + if err != nil { + t.Fatalf("failed to bind server: %s", err) + } + }() + + defer func() { + err := srv1.Close() + if err != nil { + t.Errorf("failed to close server: %s", err) + } + }() + + addr2 := "localhost:2234" + srv2 := server.NewServer() + go func() { + err := srv2.Listen(addr2) + if err != nil { + t.Fatalf("failed to bind server: %s", err) + } + }() + + defer func() { + err := srv2.Close() + if err != nil { + t.Errorf("failed to close server: %s", err) + } + }() + + idAlice := "alice" + log.Debugf("connect by alice") + mgr := NewManager(ctx, addr1, idAlice) + relayCleanupInterval = 2 * time.Second + mgr.Serve() + + conn, err := mgr.OpenConn(addr2, "anotherpeer") + if err != nil { + t.Fatalf("failed to bind channel: %s", err) + } + + err = conn.Close() + if err != nil { + t.Fatalf("failed to close connection: %s", err) + } + + time.Sleep(relayCleanupInterval + 1*time.Second) + if len(mgr.relayClients) != 0 { + t.Errorf("expected 0, got %d", len(mgr.relayClients)) + } +} diff --git a/relay/server/listener/wsnhooyr/conn.go b/relay/server/listener/wsnhooyr/conn.go index c3461cb85..72b6bfecb 100644 --- a/relay/server/listener/wsnhooyr/conn.go +++ b/relay/server/listener/wsnhooyr/conn.go @@ -79,6 +79,7 @@ func (c *Conn) Close() error { return c.Conn.Close(websocket.StatusNormalClosure, "") } +// todo: fix io.EOF handling func ioErrHandling(err error) error { var wErr *websocket.CloseError if !errors.As(err, &wErr) { diff --git a/relay/server/server.go b/relay/server/server.go index 7e729c309..cb9816907 100644 --- a/relay/server/server.go +++ b/relay/server/server.go @@ -18,7 +18,6 @@ import ( // Server // todo: // authentication: provide JWT token via RPC call. The MGM server can forward the token to the agents. -// connection timeout handling type Server struct { store *Store