diff --git a/client/internal/connect.go b/client/internal/connect.go index 19fe13c1e..dfcb1ebf8 100644 --- a/client/internal/connect.go +++ b/client/internal/connect.go @@ -26,10 +26,10 @@ import ( func RunClient(ctx context.Context, config *Config, statusRecorder *nbStatus.Status) error { backOff := &backoff.ExponentialBackOff{ InitialInterval: time.Second, - RandomizationFactor: backoff.DefaultRandomizationFactor, - Multiplier: backoff.DefaultMultiplier, - MaxInterval: 10 * time.Second, - MaxElapsedTime: 24 * 3 * time.Hour, // stop the client after 3 days trying (must be a huge problem, e.g permission denied) + RandomizationFactor: 1, + Multiplier: 1.7, + MaxInterval: 15 * time.Second, + MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months Stop: backoff.Stop, Clock: backoff.SystemClock, } @@ -43,7 +43,6 @@ func RunClient(ctx context.Context, config *Config, statusRecorder *nbStatus.Sta }() wrapErr := state.Wrap - // validate our peer's Wireguard PRIVATE key myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey) if err != nil { log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error()) @@ -84,10 +83,9 @@ func RunClient(ctx context.Context, config *Config, statusRecorder *nbStatus.Sta publicSSHKey) if err != nil { log.Debug(err) - if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied { - log.Info("peer registration required. Please run `netbird status` for details") + if s, ok := gstatus.FromError(err); ok && (s.Code() == codes.PermissionDenied) { state.Set(StatusNeedsLogin) - return nil + return backoff.Permanent(wrapErr(err)) // unrecoverable error } return wrapErr(err) } @@ -158,7 +156,7 @@ func RunClient(ctx context.Context, config *Config, statusRecorder *nbStatus.Sta return wrapErr(err) } - log.Info("stopped Netbird client") + log.Info("stopped NetBird client") if _, err := state.Status(); err == ErrResetConnection { return err @@ -169,7 +167,7 @@ func RunClient(ctx context.Context, config *Config, statusRecorder *nbStatus.Sta err = backoff.Retry(operation, backOff) if err != nil { - log.Errorf("exiting client retry loop due to unrecoverable error: %s", err) + log.Debugf("exiting client retry loop due to unrecoverable error: %s", err) return err } return nil diff --git a/management/client/grpc.go b/management/client/grpc.go index 39e94bc96..a2847e8b7 100644 --- a/management/client/grpc.go +++ b/management/client/grpc.go @@ -37,7 +37,7 @@ func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsE transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})) } - mgmCtx, cancel := context.WithTimeout(ctx, time.Second*3) + mgmCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() conn, err := grpc.DialContext( mgmCtx, @@ -72,10 +72,10 @@ func (c *GrpcClient) Close() error { func defaultBackoff(ctx context.Context) backoff.BackOff { return backoff.WithContext(&backoff.ExponentialBackOff{ InitialInterval: 800 * time.Millisecond, - RandomizationFactor: backoff.DefaultRandomizationFactor, - Multiplier: backoff.DefaultMultiplier, + RandomizationFactor: 1, + Multiplier: 1.7, MaxInterval: 10 * time.Second, - MaxElapsedTime: 12 * time.Hour, // stop after 12 hours of trying, the error will be propagated to the general retry of the client + MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months Stop: backoff.Stop, Clock: backoff.SystemClock, }, ctx) @@ -95,20 +95,26 @@ func (c *GrpcClient) Sync(msgHandler func(msg *proto.SyncResponse) error) error operation := func() error { log.Debugf("management connection state %v", c.conn.GetState()) - if !c.ready() { - return fmt.Errorf("no connection to management") + connState := c.conn.GetState() + if connState == connectivity.Shutdown { + return backoff.Permanent(fmt.Errorf("connection to management has been shut down")) + } else if !(connState == connectivity.Ready || connState == connectivity.Idle) { + c.conn.WaitForStateChange(c.ctx, connState) + return fmt.Errorf("connection to management is not ready and in %s state", connState) } - // todo we already have it since we did the Login, maybe cache it locally? serverPubKey, err := c.GetServerPublicKey() if err != nil { - log.Errorf("failed getting Management Service public key: %s", err) + log.Debugf("failed getting Management Service public key: %s", err) return err } stream, err := c.connectToStream(*serverPubKey) if err != nil { - log.Errorf("failed to open Management Service stream: %s", err) + log.Debugf("failed to open Management Service stream: %s", err) + if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied { + return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer + } return err } @@ -117,10 +123,13 @@ func (c *GrpcClient) Sync(msgHandler func(msg *proto.SyncResponse) error) error // blocking until error err = c.receiveEvents(stream, *serverPubKey, msgHandler) if err != nil { - if s, ok := gstatus.FromError(err); ok && (s.Code() == codes.InvalidArgument || s.Code() == codes.PermissionDenied) { - return backoff.Permanent(err) + if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied { + return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer } + // we need this reset because after a successful connection and a consequent error, backoff lib doesn't + // reset times and next try will start with a long delay backOff.Reset() + log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err) return err } @@ -129,7 +138,7 @@ func (c *GrpcClient) Sync(msgHandler func(msg *proto.SyncResponse) error) error err := backoff.Retry(operation, backOff) if err != nil { - log.Warnf("exiting Management Service connection retry loop due to Permanent error: %s", err) + log.Warnf("exiting the Management service connection retry loop due to the unrecoverable error: %s", err) return err } @@ -156,11 +165,11 @@ func (c *GrpcClient) receiveEvents(stream proto.ManagementService_SyncClient, se for { update, err := stream.Recv() if err == io.EOF { - log.Errorf("Management stream has been closed by server: %s", err) + log.Debugf("Management stream has been closed by server: %s", err) return err } if err != nil { - log.Warnf("disconnected from Management Service sync stream: %v", err) + log.Debugf("disconnected from Management Service sync stream: %v", err) return err } @@ -180,13 +189,13 @@ func (c *GrpcClient) receiveEvents(stream proto.ManagementService_SyncClient, se } } -// GetServerPublicKey returns server Wireguard public key (used later for encrypting messages sent to the server) +// GetServerPublicKey returns server's WireGuard public key (used later for encrypting messages sent to the server) func (c *GrpcClient) GetServerPublicKey() (*wgtypes.Key, error) { if !c.ready() { return nil, fmt.Errorf("no connection to management") } - mgmCtx, cancel := context.WithTimeout(c.ctx, time.Second*2) + mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second) defer cancel() resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{}) if err != nil { @@ -210,7 +219,7 @@ func (c *GrpcClient) login(serverKey wgtypes.Key, req *proto.LoginRequest) (*pro log.Errorf("failed to encrypt message: %s", err) return nil, err } - mgmCtx, cancel := context.WithTimeout(c.ctx, time.Second*2) + mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second) defer cancel() resp, err := c.realClient.Login(mgmCtx, &proto.EncryptedMessage{ WgPubKey: c.key.PublicKey().String(), diff --git a/signal/client/grpc.go b/signal/client/grpc.go index 4d2463766..5e23923e0 100644 --- a/signal/client/grpc.go +++ b/signal/client/grpc.go @@ -89,14 +89,13 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo func defaultBackoff(ctx context.Context) backoff.BackOff { return backoff.WithContext(&backoff.ExponentialBackOff{ InitialInterval: 800 * time.Millisecond, - RandomizationFactor: backoff.DefaultRandomizationFactor, - Multiplier: backoff.DefaultMultiplier, + RandomizationFactor: 1, + Multiplier: 1.7, MaxInterval: 10 * time.Second, - MaxElapsedTime: 12 * time.Hour, //stop after 12 hours of trying, the error will be propagated to the general retry of the client + MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months Stop: backoff.Stop, Clock: backoff.SystemClock, }, ctx) - } // Receive Connects to the Signal Exchange message stream and starts receiving messages. @@ -112,8 +111,12 @@ func (c *GrpcClient) Receive(msgHandler func(msg *proto.Message) error) error { c.notifyStreamDisconnected() log.Debugf("signal connection state %v", c.signalConn.GetState()) - if !c.Ready() { - return fmt.Errorf("no connection to signal") + connState := c.signalConn.GetState() + if connState == connectivity.Shutdown { + return backoff.Permanent(fmt.Errorf("connection to signal has been shut down")) + } else if !(connState == connectivity.Ready || connState == connectivity.Idle) { + c.signalConn.WaitForStateChange(c.ctx, connState) + return fmt.Errorf("connection to signal is not ready and in %s state", connState) } // connect to Signal stream identifying ourselves with a public Wireguard key @@ -131,7 +134,8 @@ func (c *GrpcClient) Receive(msgHandler func(msg *proto.Message) error) error { // start receiving messages from the Signal stream (from other peers through signal) err = c.receive(stream, msgHandler) if err != nil { - log.Warnf("disconnected from the Signal Exchange due to an error: %v", err) + // we need this reset because after a successful connection and a consequent error, backoff lib doesn't + // reset times and next try will start with a long delay backOff.Reset() return err } @@ -141,7 +145,7 @@ func (c *GrpcClient) Receive(msgHandler func(msg *proto.Message) error) error { err := backoff.Retry(operation, backOff) if err != nil { - log.Errorf("exiting Signal Service connection retry loop due to unrecoverable error: %s", err) + log.Errorf("exiting the Signal service connection retry loop due to the unrecoverable error: %v", err) return err } @@ -308,13 +312,13 @@ func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient, for { msg, err := stream.Recv() if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { - log.Warnf("stream canceled (usually indicates shutdown)") + log.Debugf("stream canceled (usually indicates shutdown)") return err } else if s.Code() == codes.Unavailable { - log.Warnf("Signal Service is unavailable") + log.Debugf("Signal Service is unavailable") return err } else if err == io.EOF { - log.Warnf("Signal Service stream closed by server") + log.Debugf("Signal Service stream closed by server") return err } else if err != nil { return err