fix: client app retry logic (#144)

* fix: retry logic
This commit is contained in:
Mikhail Bragin 2021-11-01 09:34:06 +01:00 committed by GitHub
parent 2c729fe5cc
commit d040cfed7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 122 additions and 116 deletions

View File

@ -1,7 +1,6 @@
package cmd
import (
"github.com/cenkalti/backoff/v4"
"github.com/kardianos/service"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@ -11,31 +10,12 @@ import (
func (p *program) Start(s service.Service) error {
var backOff = &backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
// Start should not block. Do the actual work async.
log.Info("starting service") //nolint
go func() {
operation := func() error {
err := runClient()
if err != nil {
log.Warnf("retrying Wiretrustee client app due to error: %v", err)
return err
}
return nil
}
err := backoff.Retry(operation, backOff)
if err != nil {
log.Errorf("exiting client retry loop due to unrecoverable error: %s", err)
log.Errorf("stopped Wiretrustee client app due to error: %v", err)
return
}
}()

View File

@ -2,6 +2,7 @@ package cmd
import (
"context"
"github.com/cenkalti/backoff/v4"
"github.com/kardianos/service"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@ -12,6 +13,7 @@ import (
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"time"
)
var (
@ -117,6 +119,18 @@ func connectToManagement(ctx context.Context, managementAddr string, ourPrivateK
}
func runClient() error {
var backOff = &backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: time.Hour,
MaxElapsedTime: 24 * 3 * time.Hour,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
operation := func() error {
config, err := internal.ReadConfig(managementURL, configPath)
if err != nil {
log.Errorf("failed reading config %s %v", configPath, err)
@ -174,6 +188,8 @@ func runClient() error {
case <-ctx.Done():
}
backOff.Reset()
err = mgmClient.Close()
if err != nil {
log.Errorf("failed closing Management Service client %v", err)
@ -198,5 +214,12 @@ func runClient() error {
log.Info("stopped Wiretrustee client")
return ctx.Err()
}
err := backoff.Retry(operation, backOff)
if err != nil {
log.Errorf("exiting client retry loop due to unrecoverable error: %s", err)
return err
}
return nil
}

View File

@ -70,8 +70,8 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
MaxInterval: 15 * time.Minute,
MaxElapsedTime: time.Hour, //stop after an hour of trying, the error will be propagated to the general retry of the client
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
@ -103,12 +103,10 @@ func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
// blocking until error
err = c.receiveEvents(stream, *serverPubKey, msgHandler)
if err != nil {
/*if errStatus, ok := status.FromError(err); ok && errStatus.Code() == codes.PermissionDenied {
//todo handle differently??
}*/
backOff.Reset()
return err
}
backOff.Reset()
return nil
}

View File

@ -81,8 +81,8 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
MaxInterval: 15 * time.Minute,
MaxElapsedTime: time.Hour, //stop after an hour of trying, the error will be propagated to the general retry of the client
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
@ -101,14 +101,19 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) {
operation := func() error {
err := c.connect(c.key.PublicKey().String(), msgHandler)
stream, err := c.connect(c.key.PublicKey().String())
if err != nil {
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
c.connWg.Add(1)
return err
}
err = c.receive(stream, msgHandler)
if err != nil {
backOff.Reset()
return err
}
return nil
}
@ -120,7 +125,7 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) {
}()
}
func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error) error {
func (c *Client) connect(key string) (proto.SignalExchange_ConnectStreamClient, error) {
c.stream = nil
// add key fingerprint to the request header to be identified on the server side
@ -131,23 +136,23 @@ func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error)
c.stream = stream
if err != nil {
return err
return nil, err
}
// blocks
header, err := c.stream.Header()
if err != nil {
return err
return nil, err
}
registered := header.Get(proto.HeaderRegistered)
if len(registered) == 0 {
return fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams")
return nil, fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams")
}
//connection established we are good to use the stream
c.connWg.Done()
log.Infof("connected to the Signal Exchange Stream")
return c.receive(stream, msgHandler)
return stream, nil
}
// WaitConnected waits until the client is connected to the message stream