Introduce larger retries for the agent (#379)

The Management client will try reconnecting in case.
of network issues or non-permanent errors.
If the device was off-boarded, then the client will stop retrying.
This commit is contained in:
Misha Bragin 2022-07-02 20:38:16 +02:00 committed by GitHub
parent 8c953c5a2c
commit 3bdfa3cc8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 49 additions and 38 deletions

View File

@ -26,10 +26,10 @@ import (
func RunClient(ctx context.Context, config *Config, statusRecorder *nbStatus.Status) error { func RunClient(ctx context.Context, config *Config, statusRecorder *nbStatus.Status) error {
backOff := &backoff.ExponentialBackOff{ backOff := &backoff.ExponentialBackOff{
InitialInterval: time.Second, InitialInterval: time.Second,
RandomizationFactor: backoff.DefaultRandomizationFactor, RandomizationFactor: 1,
Multiplier: backoff.DefaultMultiplier, Multiplier: 1.7,
MaxInterval: 10 * time.Second, MaxInterval: 15 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, // stop the client after 3 days trying (must be a huge problem, e.g permission denied) MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months
Stop: backoff.Stop, Stop: backoff.Stop,
Clock: backoff.SystemClock, Clock: backoff.SystemClock,
} }
@ -43,7 +43,6 @@ func RunClient(ctx context.Context, config *Config, statusRecorder *nbStatus.Sta
}() }()
wrapErr := state.Wrap wrapErr := state.Wrap
// validate our peer's Wireguard PRIVATE key
myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey) myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey)
if err != nil { if err != nil {
log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error()) log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error())
@ -84,10 +83,9 @@ func RunClient(ctx context.Context, config *Config, statusRecorder *nbStatus.Sta
publicSSHKey) publicSSHKey)
if err != nil { if err != nil {
log.Debug(err) log.Debug(err)
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied { if s, ok := gstatus.FromError(err); ok && (s.Code() == codes.PermissionDenied) {
log.Info("peer registration required. Please run `netbird status` for details")
state.Set(StatusNeedsLogin) state.Set(StatusNeedsLogin)
return nil return backoff.Permanent(wrapErr(err)) // unrecoverable error
} }
return wrapErr(err) return wrapErr(err)
} }
@ -158,7 +156,7 @@ func RunClient(ctx context.Context, config *Config, statusRecorder *nbStatus.Sta
return wrapErr(err) return wrapErr(err)
} }
log.Info("stopped Netbird client") log.Info("stopped NetBird client")
if _, err := state.Status(); err == ErrResetConnection { if _, err := state.Status(); err == ErrResetConnection {
return err return err
@ -169,7 +167,7 @@ func RunClient(ctx context.Context, config *Config, statusRecorder *nbStatus.Sta
err = backoff.Retry(operation, backOff) err = backoff.Retry(operation, backOff)
if err != nil { if err != nil {
log.Errorf("exiting client retry loop due to unrecoverable error: %s", err) log.Debugf("exiting client retry loop due to unrecoverable error: %s", err)
return err return err
} }
return nil return nil

View File

@ -37,7 +37,7 @@ func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsE
transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})) transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))
} }
mgmCtx, cancel := context.WithTimeout(ctx, time.Second*3) mgmCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel() defer cancel()
conn, err := grpc.DialContext( conn, err := grpc.DialContext(
mgmCtx, mgmCtx,
@ -72,10 +72,10 @@ func (c *GrpcClient) Close() error {
func defaultBackoff(ctx context.Context) backoff.BackOff { func defaultBackoff(ctx context.Context) backoff.BackOff {
return backoff.WithContext(&backoff.ExponentialBackOff{ return backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond, InitialInterval: 800 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor, RandomizationFactor: 1,
Multiplier: backoff.DefaultMultiplier, Multiplier: 1.7,
MaxInterval: 10 * time.Second, MaxInterval: 10 * time.Second,
MaxElapsedTime: 12 * time.Hour, // stop after 12 hours of trying, the error will be propagated to the general retry of the client MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months
Stop: backoff.Stop, Stop: backoff.Stop,
Clock: backoff.SystemClock, Clock: backoff.SystemClock,
}, ctx) }, ctx)
@ -95,20 +95,26 @@ func (c *GrpcClient) Sync(msgHandler func(msg *proto.SyncResponse) error) error
operation := func() error { operation := func() error {
log.Debugf("management connection state %v", c.conn.GetState()) log.Debugf("management connection state %v", c.conn.GetState())
if !c.ready() { connState := c.conn.GetState()
return fmt.Errorf("no connection to management") if connState == connectivity.Shutdown {
return backoff.Permanent(fmt.Errorf("connection to management has been shut down"))
} else if !(connState == connectivity.Ready || connState == connectivity.Idle) {
c.conn.WaitForStateChange(c.ctx, connState)
return fmt.Errorf("connection to management is not ready and in %s state", connState)
} }
// todo we already have it since we did the Login, maybe cache it locally?
serverPubKey, err := c.GetServerPublicKey() serverPubKey, err := c.GetServerPublicKey()
if err != nil { if err != nil {
log.Errorf("failed getting Management Service public key: %s", err) log.Debugf("failed getting Management Service public key: %s", err)
return err return err
} }
stream, err := c.connectToStream(*serverPubKey) stream, err := c.connectToStream(*serverPubKey)
if err != nil { if err != nil {
log.Errorf("failed to open Management Service stream: %s", err) log.Debugf("failed to open Management Service stream: %s", err)
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied {
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
}
return err return err
} }
@ -117,10 +123,13 @@ func (c *GrpcClient) Sync(msgHandler func(msg *proto.SyncResponse) error) error
// blocking until error // blocking until error
err = c.receiveEvents(stream, *serverPubKey, msgHandler) err = c.receiveEvents(stream, *serverPubKey, msgHandler)
if err != nil { if err != nil {
if s, ok := gstatus.FromError(err); ok && (s.Code() == codes.InvalidArgument || s.Code() == codes.PermissionDenied) { if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied {
return backoff.Permanent(err) return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
} }
// we need this reset because after a successful connection and a consequent error, backoff lib doesn't
// reset times and next try will start with a long delay
backOff.Reset() backOff.Reset()
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
return err return err
} }
@ -129,7 +138,7 @@ func (c *GrpcClient) Sync(msgHandler func(msg *proto.SyncResponse) error) error
err := backoff.Retry(operation, backOff) err := backoff.Retry(operation, backOff)
if err != nil { if err != nil {
log.Warnf("exiting Management Service connection retry loop due to Permanent error: %s", err) log.Warnf("exiting the Management service connection retry loop due to the unrecoverable error: %s", err)
return err return err
} }
@ -156,11 +165,11 @@ func (c *GrpcClient) receiveEvents(stream proto.ManagementService_SyncClient, se
for { for {
update, err := stream.Recv() update, err := stream.Recv()
if err == io.EOF { if err == io.EOF {
log.Errorf("Management stream has been closed by server: %s", err) log.Debugf("Management stream has been closed by server: %s", err)
return err return err
} }
if err != nil { if err != nil {
log.Warnf("disconnected from Management Service sync stream: %v", err) log.Debugf("disconnected from Management Service sync stream: %v", err)
return err return err
} }
@ -180,13 +189,13 @@ func (c *GrpcClient) receiveEvents(stream proto.ManagementService_SyncClient, se
} }
} }
// GetServerPublicKey returns server Wireguard public key (used later for encrypting messages sent to the server) // GetServerPublicKey returns server's WireGuard public key (used later for encrypting messages sent to the server)
func (c *GrpcClient) GetServerPublicKey() (*wgtypes.Key, error) { func (c *GrpcClient) GetServerPublicKey() (*wgtypes.Key, error) {
if !c.ready() { if !c.ready() {
return nil, fmt.Errorf("no connection to management") return nil, fmt.Errorf("no connection to management")
} }
mgmCtx, cancel := context.WithTimeout(c.ctx, time.Second*2) mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
defer cancel() defer cancel()
resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{}) resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{})
if err != nil { if err != nil {
@ -210,7 +219,7 @@ func (c *GrpcClient) login(serverKey wgtypes.Key, req *proto.LoginRequest) (*pro
log.Errorf("failed to encrypt message: %s", err) log.Errorf("failed to encrypt message: %s", err)
return nil, err return nil, err
} }
mgmCtx, cancel := context.WithTimeout(c.ctx, time.Second*2) mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
defer cancel() defer cancel()
resp, err := c.realClient.Login(mgmCtx, &proto.EncryptedMessage{ resp, err := c.realClient.Login(mgmCtx, &proto.EncryptedMessage{
WgPubKey: c.key.PublicKey().String(), WgPubKey: c.key.PublicKey().String(),

View File

@ -89,14 +89,13 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo
func defaultBackoff(ctx context.Context) backoff.BackOff { func defaultBackoff(ctx context.Context) backoff.BackOff {
return backoff.WithContext(&backoff.ExponentialBackOff{ return backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond, InitialInterval: 800 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor, RandomizationFactor: 1,
Multiplier: backoff.DefaultMultiplier, Multiplier: 1.7,
MaxInterval: 10 * time.Second, MaxInterval: 10 * time.Second,
MaxElapsedTime: 12 * time.Hour, //stop after 12 hours of trying, the error will be propagated to the general retry of the client MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months
Stop: backoff.Stop, Stop: backoff.Stop,
Clock: backoff.SystemClock, Clock: backoff.SystemClock,
}, ctx) }, ctx)
} }
// Receive Connects to the Signal Exchange message stream and starts receiving messages. // Receive Connects to the Signal Exchange message stream and starts receiving messages.
@ -112,8 +111,12 @@ func (c *GrpcClient) Receive(msgHandler func(msg *proto.Message) error) error {
c.notifyStreamDisconnected() c.notifyStreamDisconnected()
log.Debugf("signal connection state %v", c.signalConn.GetState()) log.Debugf("signal connection state %v", c.signalConn.GetState())
if !c.Ready() { connState := c.signalConn.GetState()
return fmt.Errorf("no connection to signal") if connState == connectivity.Shutdown {
return backoff.Permanent(fmt.Errorf("connection to signal has been shut down"))
} else if !(connState == connectivity.Ready || connState == connectivity.Idle) {
c.signalConn.WaitForStateChange(c.ctx, connState)
return fmt.Errorf("connection to signal is not ready and in %s state", connState)
} }
// connect to Signal stream identifying ourselves with a public Wireguard key // connect to Signal stream identifying ourselves with a public Wireguard key
@ -131,7 +134,8 @@ func (c *GrpcClient) Receive(msgHandler func(msg *proto.Message) error) error {
// start receiving messages from the Signal stream (from other peers through signal) // start receiving messages from the Signal stream (from other peers through signal)
err = c.receive(stream, msgHandler) err = c.receive(stream, msgHandler)
if err != nil { if err != nil {
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err) // we need this reset because after a successful connection and a consequent error, backoff lib doesn't
// reset times and next try will start with a long delay
backOff.Reset() backOff.Reset()
return err return err
} }
@ -141,7 +145,7 @@ func (c *GrpcClient) Receive(msgHandler func(msg *proto.Message) error) error {
err := backoff.Retry(operation, backOff) err := backoff.Retry(operation, backOff)
if err != nil { if err != nil {
log.Errorf("exiting Signal Service connection retry loop due to unrecoverable error: %s", err) log.Errorf("exiting the Signal service connection retry loop due to the unrecoverable error: %v", err)
return err return err
} }
@ -308,13 +312,13 @@ func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient,
for { for {
msg, err := stream.Recv() msg, err := stream.Recv()
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
log.Warnf("stream canceled (usually indicates shutdown)") log.Debugf("stream canceled (usually indicates shutdown)")
return err return err
} else if s.Code() == codes.Unavailable { } else if s.Code() == codes.Unavailable {
log.Warnf("Signal Service is unavailable") log.Debugf("Signal Service is unavailable")
return err return err
} else if err == io.EOF { } else if err == io.EOF {
log.Warnf("Signal Service stream closed by server") log.Debugf("Signal Service stream closed by server")
return err return err
} else if err != nil { } else if err != nil {
return err return err