Move reconnection logic to separated struct

This commit is contained in:
Zoltán Papp 2024-06-01 11:25:00 +02:00
parent 4ff069a102
commit fd4ad15c83
4 changed files with 121 additions and 109 deletions

View File

@ -19,10 +19,6 @@ const (
serverResponseTimeout = 8 * time.Second serverResponseTimeout = 8 * time.Second
) )
var (
reconnectingTimeout = 5 * time.Second
)
type Msg struct { type Msg struct {
buf []byte buf []byte
} }
@ -47,35 +43,42 @@ type Client struct {
serviceIsRunning bool serviceIsRunning bool
serviceIsRunningMutex sync.Mutex serviceIsRunningMutex sync.Mutex
wgReadLoop sync.WaitGroup wgReadLoop sync.WaitGroup
onDisconnected chan struct{}
remoteAddr net.Addr remoteAddr net.Addr
onDisconnectListener func()
listenerMutex sync.Mutex
} }
func NewClient(ctx context.Context, serverAddress, peerID string) *Client { func NewClient(ctx context.Context, serverAddress, peerID string) *Client {
ctx, ctxCancel := context.WithCancel(ctx) ctx, ctxCancel := context.WithCancel(ctx)
hashedID, hashedStringId := messages.HashID(peerID) hashedID, hashedStringId := messages.HashID(peerID)
return &Client{ return &Client{
log: log.WithField("client_id", hashedStringId), log: log.WithField("client_id", hashedStringId),
ctx: ctx, ctx: ctx,
ctxCancel: ctxCancel, ctxCancel: ctxCancel,
serverAddress: serverAddress, serverAddress: serverAddress,
hashedID: hashedID, hashedID: hashedID,
conns: make(map[string]*connContainer), conns: make(map[string]*connContainer),
onDisconnected: make(chan struct{}),
} }
} }
func (c *Client) SetOnDisconnectListener(fn func()) {
c.listenerMutex.Lock()
defer c.listenerMutex.Unlock()
c.onDisconnectListener = fn
}
func (c *Client) Connect() error { func (c *Client) Connect() error {
c.serviceIsRunningMutex.Lock() c.serviceIsRunningMutex.Lock()
defer c.serviceIsRunningMutex.Unlock()
if c.serviceIsRunning { if c.serviceIsRunning {
c.serviceIsRunningMutex.Unlock()
return nil return nil
} }
err := c.connect() err := c.connect()
if err != nil { if err != nil {
c.serviceIsRunningMutex.Unlock()
return err return err
} }
@ -84,41 +87,6 @@ func (c *Client) Connect() error {
c.wgReadLoop.Add(1) c.wgReadLoop.Add(1)
go c.readLoop() go c.readLoop()
c.serviceIsRunningMutex.Unlock()
go func() {
<-c.ctx.Done()
cErr := c.close()
if cErr != nil {
log.Errorf("failed to close relay connection: %s", cErr)
}
}()
go c.reconnectGuard()
return nil
}
func (c *Client) ConnectWithoutReconnect() error {
c.serviceIsRunningMutex.Lock()
if c.serviceIsRunning {
c.serviceIsRunningMutex.Unlock()
return nil
}
err := c.connect()
if err != nil {
c.serviceIsRunningMutex.Unlock()
return err
}
c.serviceIsRunning = true
c.wgReadLoop.Add(1)
go c.readLoop()
c.serviceIsRunningMutex.Unlock()
go func() { go func() {
<-c.ctx.Done() <-c.ctx.Done()
cErr := c.close() cErr := c.close()
@ -210,33 +178,6 @@ func (c *Client) close() error {
return err return err
} }
func (c *Client) reconnectGuard() {
for {
c.wgReadLoop.Wait()
c.serviceIsRunningMutex.Lock()
if !c.serviceIsRunning {
c.serviceIsRunningMutex.Unlock()
return
}
log.Infof("reconnecting to relay server")
err := c.connect()
if err != nil {
log.Errorf("failed to reconnect to relay server: %s", err)
c.serviceIsRunningMutex.Unlock()
time.Sleep(reconnectingTimeout)
continue
}
log.Infof("reconnected to relay server")
c.wgReadLoop.Add(1)
go c.readLoop()
c.serviceIsRunningMutex.Unlock()
}
}
func (c *Client) handShake() error { func (c *Client) handShake() error {
defer func() { defer func() {
err := c.relayConn.SetReadDeadline(time.Time{}) err := c.relayConn.SetReadDeadline(time.Time{})
@ -322,6 +263,8 @@ func (c *Client) readLoop() {
} }
} }
c.notifyDisconnected()
if c.serviceIsRunning { if c.serviceIsRunning {
_ = c.relayConn.Close() _ = c.relayConn.Close()
} }
@ -384,3 +327,23 @@ func (c *Client) closeConn(id string) error {
return nil return nil
} }
func (c *Client) onDisconnect() {
c.listenerMutex.Lock()
defer c.listenerMutex.Unlock()
if c.onDisconnectListener == nil {
return
}
c.onDisconnectListener()
}
func (c *Client) notifyDisconnected() {
c.listenerMutex.Lock()
defer c.listenerMutex.Unlock()
if c.onDisconnectListener == nil {
return
}
go c.onDisconnectListener()
}

33
relay/client/guard.go Normal file
View File

@ -0,0 +1,33 @@
package client
import (
"context"
"time"
)
var (
reconnectingTimeout = 5 * time.Second
)
type Guard struct {
ctx context.Context
relayClient *Client
}
func NewGuard(context context.Context, relayClient *Client) *Guard {
g := &Guard{
ctx: context,
relayClient: relayClient,
}
return g
}
func (g *Guard) OnDisconnected() {
select {
case <-time.After(time.Second):
_ = g.relayClient.Connect()
case <-g.ctx.Done():
return
}
}

View File

@ -4,17 +4,19 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"sync"
) )
// Manager todo: thread safe
type Manager struct { type Manager struct {
ctx context.Context ctx context.Context
srvAddress string srvAddress string
peerID string peerID string
relayClient *Client relayClient *Client
reconnectGuard *Guard
relayClients map[string]*Client relayClients map[string]*Client
relayClientsMutex sync.Mutex
} }
func NewManager(ctx context.Context, serverAddress string, peerID string) *Manager { func NewManager(ctx context.Context, serverAddress string, peerID string) *Manager {
@ -28,13 +30,33 @@ func NewManager(ctx context.Context, serverAddress string, peerID string) *Manag
func (m *Manager) Serve() error { func (m *Manager) Serve() error {
m.relayClient = NewClient(m.ctx, m.srvAddress, m.peerID) m.relayClient = NewClient(m.ctx, m.srvAddress, m.peerID)
m.reconnectGuard = NewGuard(m.ctx, m.relayClient)
m.relayClient.SetOnDisconnectListener(m.reconnectGuard.OnDisconnected)
err := m.relayClient.Connect() err := m.relayClient.Connect()
if err != nil { if err != nil {
return err return err
} }
return nil return nil
} }
func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) {
if m.relayClient == nil {
return nil, fmt.Errorf("relay client not connected")
}
foreign, err := m.isForeignServer(serverAddress)
if err != nil {
return nil, err
}
if foreign {
return m.openConnVia(serverAddress, peerKey)
} else {
return m.relayClient.OpenConn(peerKey)
}
}
func (m *Manager) RelayAddress() (net.Addr, error) { func (m *Manager) RelayAddress() (net.Addr, error) {
if m.relayClient == nil { if m.relayClient == nil {
return nil, fmt.Errorf("relay client not connected") return nil, fmt.Errorf("relay client not connected")
@ -42,48 +64,38 @@ func (m *Manager) RelayAddress() (net.Addr, error) {
return m.relayClient.RelayRemoteAddress() return m.relayClient.RelayRemoteAddress()
} }
func (m *Manager) OpenConn(peerKey string) (net.Conn, error) { func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) {
if m.relayClient == nil {
return nil, fmt.Errorf("relay client not connected")
}
rAddr, err := m.relayClient.RelayRemoteAddress()
if err != nil {
return nil, fmt.Errorf("relay client not connected")
}
return m.OpenConnTo(rAddr.String(), peerKey)
}
func (m *Manager) OpenConnTo(serverAddress, peerKey string) (net.Conn, error) {
if m.relayClient == nil {
return nil, fmt.Errorf("relay client not connected")
}
rAddr, err := m.relayClient.RelayRemoteAddress()
if err != nil {
return nil, fmt.Errorf("relay client not connected")
}
if rAddr.String() == serverAddress {
return m.relayClient.OpenConn(peerKey)
}
relayClient, ok := m.relayClients[serverAddress] relayClient, ok := m.relayClients[serverAddress]
if ok { if ok {
return relayClient.OpenConn(peerKey) return relayClient.OpenConn(peerKey)
} }
relayClient = NewClient(m.ctx, serverAddress, m.peerID) relayClient = NewClient(m.ctx, serverAddress, m.peerID)
err = relayClient.ConnectWithoutReconnect() err := relayClient.Connect()
if err != nil { if err != nil {
return nil, err return nil, err
} }
relayClient.SetOnDisconnectListener(func() {
m.deleteRelayConn(serverAddress)
})
conn, err := relayClient.OpenConn(peerKey) conn, err := relayClient.OpenConn(peerKey)
if err != nil { if err != nil {
return nil, err return nil, err
} }
m.relayClients[serverAddress] = relayClient m.relayClients[serverAddress] = relayClient
return conn, nil return conn, nil
} }
func (m *Manager) deleteRelayConn(address string) {
delete(m.relayClients, address)
}
func (m *Manager) isForeignServer(address string) (bool, error) {
rAddr, err := m.relayClient.RelayRemoteAddress()
if err != nil {
return false, fmt.Errorf("relay client not connected")
}
return rAddr.String() != address, nil
}

View File

@ -48,6 +48,10 @@ func TestNewManager(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("failed to connect to server: %s", err) t.Fatalf("failed to connect to server: %s", err)
} }
aliceSrvAddr, err := clientAlice.RelayAddress()
if err != nil {
t.Fatalf("failed to get relay address: %s", err)
}
clientBob := NewManager(ctx, addr2, idBob) clientBob := NewManager(ctx, addr2, idBob)
err = clientBob.Serve() err = clientBob.Serve()
@ -59,12 +63,12 @@ func TestNewManager(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("failed to get relay address: %s", err) t.Fatalf("failed to get relay address: %s", err)
} }
connAliceToBob, err := clientAlice.OpenConnTo(bobsSrvAddr.String(), idBob) connAliceToBob, err := clientAlice.OpenConn(bobsSrvAddr.String(), idBob)
if err != nil { if err != nil {
t.Fatalf("failed to bind channel: %s", err) t.Fatalf("failed to bind channel: %s", err)
} }
connBobToAlice, err := clientBob.OpenConn(idAlice) connBobToAlice, err := clientBob.OpenConn(aliceSrvAddr.String(), idAlice)
if err != nil { if err != nil {
t.Fatalf("failed to bind channel: %s", err) t.Fatalf("failed to bind channel: %s", err)
} }