2024-09-08 12:06:14 +02:00
|
|
|
package client
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"time"
|
|
|
|
|
2024-11-22 18:12:34 +01:00
|
|
|
"github.com/cenkalti/backoff/v4"
|
2024-09-08 12:06:14 +02:00
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
2024-11-22 18:12:34 +01:00
|
|
|
reconnectingTimeout = 60 * time.Second
|
2024-09-08 12:06:14 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
// Guard manage the reconnection tries to the Relay server in case of disconnection event.
|
|
|
|
type Guard struct {
|
2024-11-22 18:12:34 +01:00
|
|
|
// OnNewRelayClient is a channel that is used to notify the relay client about a new relay client instance.
|
|
|
|
OnNewRelayClient chan *Client
|
|
|
|
serverPicker *ServerPicker
|
2024-09-08 12:06:14 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewGuard creates a new guard for the relay client.
|
2024-11-22 18:12:34 +01:00
|
|
|
func NewGuard(sp *ServerPicker) *Guard {
|
2024-09-08 12:06:14 +02:00
|
|
|
g := &Guard{
|
2024-11-22 18:12:34 +01:00
|
|
|
OnNewRelayClient: make(chan *Client, 1),
|
|
|
|
serverPicker: sp,
|
2024-09-08 12:06:14 +02:00
|
|
|
}
|
|
|
|
return g
|
|
|
|
}
|
|
|
|
|
2024-11-22 18:12:34 +01:00
|
|
|
// StartReconnectTrys is called when the relay client is disconnected from the relay server.
|
|
|
|
// It attempts to reconnect to the relay server. The function first tries a quick reconnect
|
|
|
|
// to the same server that was used before, if the server URL is still valid. If the quick
|
|
|
|
// reconnect fails, it starts a ticker to periodically attempt server picking until it
|
|
|
|
// succeeds or the context is done.
|
|
|
|
//
|
|
|
|
// Parameters:
|
|
|
|
// - ctx: The context to control the lifecycle of the reconnection attempts.
|
|
|
|
// - relayClient: The relay client instance that was disconnected.
|
2024-09-08 12:06:14 +02:00
|
|
|
// todo prevent multiple reconnection instances. In the current usage it should not happen, but it is better to prevent
|
2024-11-22 18:12:34 +01:00
|
|
|
func (g *Guard) StartReconnectTrys(ctx context.Context, relayClient *Client) {
|
|
|
|
if relayClient == nil {
|
|
|
|
goto RETRY
|
|
|
|
}
|
|
|
|
if g.isServerURLStillValid(relayClient) && g.quickReconnect(ctx, relayClient) {
|
2024-10-24 11:43:14 +02:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2024-11-22 18:12:34 +01:00
|
|
|
RETRY:
|
|
|
|
ticker := exponentTicker(ctx)
|
2024-09-08 12:06:14 +02:00
|
|
|
defer ticker.Stop()
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ticker.C:
|
2024-11-22 18:12:34 +01:00
|
|
|
if err := g.retry(ctx); err != nil {
|
|
|
|
log.Errorf("failed to pick new Relay server: %s", err)
|
2024-09-08 12:06:14 +02:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
return
|
2024-11-22 18:12:34 +01:00
|
|
|
case <-ctx.Done():
|
2024-09-08 12:06:14 +02:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2024-10-24 11:43:14 +02:00
|
|
|
|
2024-11-22 18:12:34 +01:00
|
|
|
func (g *Guard) retry(ctx context.Context) error {
|
|
|
|
log.Infof("try to pick up a new Relay server")
|
|
|
|
relayClient, err := g.serverPicker.PickServer(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// prevent to work with a deprecated Relay client instance
|
|
|
|
g.drainRelayClientChan()
|
|
|
|
|
|
|
|
g.OnNewRelayClient <- relayClient
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *Guard) quickReconnect(parentCtx context.Context, rc *Client) bool {
|
|
|
|
ctx, cancel := context.WithTimeout(parentCtx, 1500*time.Millisecond)
|
2024-10-24 11:43:14 +02:00
|
|
|
defer cancel()
|
|
|
|
<-ctx.Done()
|
|
|
|
|
2024-11-22 18:12:34 +01:00
|
|
|
if parentCtx.Err() != nil {
|
2024-10-24 11:43:14 +02:00
|
|
|
return false
|
|
|
|
}
|
2024-11-22 18:12:34 +01:00
|
|
|
log.Infof("try to reconnect to Relay server: %s", rc.connectionURL)
|
2024-10-24 11:43:14 +02:00
|
|
|
|
2024-11-22 18:12:34 +01:00
|
|
|
if err := rc.Connect(); err != nil {
|
2024-10-24 11:43:14 +02:00
|
|
|
log.Errorf("failed to reconnect to relay server: %s", err)
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
}
|
2024-11-22 18:12:34 +01:00
|
|
|
|
|
|
|
func (g *Guard) drainRelayClientChan() {
|
|
|
|
select {
|
|
|
|
case <-g.OnNewRelayClient:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *Guard) isServerURLStillValid(rc *Client) bool {
|
|
|
|
for _, url := range g.serverPicker.ServerURLs.Load().([]string) {
|
|
|
|
if url == rc.connectionURL {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
func exponentTicker(ctx context.Context) *backoff.Ticker {
|
|
|
|
bo := backoff.WithContext(&backoff.ExponentialBackOff{
|
|
|
|
InitialInterval: 2 * time.Second,
|
|
|
|
Multiplier: 2,
|
|
|
|
MaxInterval: reconnectingTimeout,
|
|
|
|
Clock: backoff.SystemClock,
|
|
|
|
}, ctx)
|
|
|
|
|
|
|
|
return backoff.NewTicker(bo)
|
|
|
|
}
|