From 6fb8fbab1974fe424025a3f1b99532d5339e3ce0 Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Wed, 23 Nov 2022 07:13:38 -0500 Subject: [PATCH] Refactored SSL/Auth handling of Emails (#774) --- apprise/plugins/NotifyEmail.py | 70 +++++---- test/test_plugin_email.py | 270 ++++++++++++++++++++++++++++++++- 2 files changed, 305 insertions(+), 35 deletions(-) diff --git a/apprise/plugins/NotifyEmail.py b/apprise/plugins/NotifyEmail.py index b1089a32..12518d3d 100644 --- a/apprise/plugins/NotifyEmail.py +++ b/apprise/plugins/NotifyEmail.py @@ -63,15 +63,23 @@ class WebBaseLogin: # Secure Email Modes class SecureMailMode: + INSECURE = "insecure" SSL = "ssl" STARTTLS = "starttls" # Define all of the secure modes (used during validation) -SECURE_MODES = ( - SecureMailMode.SSL, - SecureMailMode.STARTTLS, -) +SECURE_MODES = { + SecureMailMode.STARTTLS: { + 'default_port': 587, + }, + SecureMailMode.SSL: { + 'default_port': 465, + }, + SecureMailMode.INSECURE: { + 'default_port': 25, + }, +} # To attempt to make this script stupid proof, if we detect an email address # that is part of the this table, we can pre-use a lot more defaults if they @@ -328,15 +336,6 @@ class NotifyEmail(NotifyBase): # Default Notify Format notify_format = NotifyFormat.HTML - # Default Non-Encryption Port - default_port = 25 - - # Default Secure Port - default_secure_port = 587 - - # Default Secure Mode - default_secure_mode = SecureMailMode.STARTTLS - # Default SMTP Timeout (in seconds) socket_connect_timeout = 15 @@ -446,14 +445,6 @@ class NotifyEmail(NotifyBase): """ super().__init__(**kwargs) - # Handle SMTP vs SMTPS (Secure vs UnSecure) - if not self.port: - if self.secure: - self.port = self.default_secure_port - - else: - self.port = self.default_port - # Acquire Email 'To' self.targets = list() @@ -511,9 +502,14 @@ class NotifyEmail(NotifyBase): smtp_host if isinstance(smtp_host, str) else '' # Now detect secure mode - self.secure_mode = self.default_secure_mode \ - if not isinstance(secure_mode, str) \ - else secure_mode.lower() + if secure_mode: + self.secure_mode = None \ + if not isinstance(secure_mode, str) \ + else secure_mode.lower() + else: + self.secure_mode = SecureMailMode.INSECURE \ + if not self.secure else self.template_args['mode']['default'] + if self.secure_mode not in SECURE_MODES: msg = 'The secure mode specified ({}) is invalid.'\ .format(secure_mode) @@ -590,6 +586,15 @@ class NotifyEmail(NotifyBase): # Apply any defaults based on certain known configurations self.NotifyEmailDefaults(secure_mode=secure_mode, **kwargs) + if not self.secure and self.secure_mode != SecureMailMode.INSECURE: + # Enable Secure mode if not otherwise set + self.secure = True + + if not self.port: + # Assign our port based on our secure_mode if not otherwise + # detected + self.port = SECURE_MODES[self.secure_mode]['default_port'] + # if there is still no smtp_host then we fall back to the hostname if not self.smtp_host: self.smtp_host = self.host @@ -653,11 +658,11 @@ class NotifyEmail(NotifyBase): if login_type: # only apply additional logic to our user if a login_type # was specified. - if is_email(self.user) and \ - WebBaseLogin.EMAIL not in login_type: - # Email specified but login type - # not supported; switch it to user id - self.user = match.group('id') + if is_email(self.user): + if WebBaseLogin.EMAIL not in login_type: + # Email specified but login type + # not supported; switch it to user id + self.user = match.group('id') elif WebBaseLogin.USERID not in login_type: # user specified but login type @@ -824,7 +829,7 @@ class NotifyEmail(NotifyBase): try: self.logger.debug('Connecting to remote SMTP server...') socket_func = smtplib.SMTP - if self.secure and self.secure_mode == SecureMailMode.SSL: + if self.secure_mode == SecureMailMode.SSL: self.logger.debug('Securing connection with SSL...') socket_func = smtplib.SMTP_SSL @@ -835,7 +840,7 @@ class NotifyEmail(NotifyBase): timeout=self.socket_connect_timeout, ) - if self.secure and self.secure_mode == SecureMailMode.STARTTLS: + if self.secure_mode == SecureMailMode.STARTTLS: # Handle Secure Connections self.logger.debug('Securing connection with STARTTLS...') socket.starttls() @@ -965,8 +970,7 @@ class NotifyEmail(NotifyBase): ) # Default Port setup - default_port = \ - self.default_secure_port if self.secure else self.default_port + default_port = SECURE_MODES[self.secure_mode]['default_port'] # a simple boolean check as to whether we display our target emails # or not diff --git a/test/test_plugin_email.py b/test/test_plugin_email.py index 65f86ce0..07cb283f 100644 --- a/test/test_plugin_email.py +++ b/test/test_plugin_email.py @@ -105,6 +105,12 @@ TEST_URLS = ( ('mailtos://user:pass@nuxref.com:567', { 'instance': NotifyEmail, }), + ('mailto://user:pass@nuxref.com?mode=ssl', { + # mailto:// with mode=ssl causes us to convert to ssl + 'instance': NotifyEmail, + # Our expected url(privacy=True) startswith() response: + 'privacy_url': 'mailtos://user:****@nuxref.com', + }), ('mailto://user:pass@nuxref.com:567?format=html', { 'instance': NotifyEmail, }), @@ -913,6 +919,7 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): assert 'smtp=smtp-mail.outlook.com' in obj.url() mock_smtp.reset_mock() + mock_smtp_ssl.reset_mock() response.reset_mock() # The below switches the `name` with the `to` to verify the results @@ -952,6 +959,11 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): assert _to[0] == 'user2@yahoo.com' assert _msg.split('\n')[-3] == 'test' + user, pw = response.login.call_args[0] + # the SMTP Server was ovr + assert pw == 'pass123' + assert user == 'user' + assert obj.url().startswith( 'mailtos://user:pass123@hotmail.com/user2%40yahoo.com') # Test that our template over-ride worked @@ -961,11 +973,38 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): assert 'reply=' not in obj.url() mock_smtp.reset_mock() + mock_smtp_ssl.reset_mock() response.reset_mock() # # Test outlook/hotmail lookups # + results = NotifyEmail.parse_url( + 'mailtos://user:pass123@hotmail.com') + obj = Apprise.instantiate(results, suppress_exceptions=False) + assert isinstance(obj, NotifyEmail) is True + assert obj.smtp_host == 'smtp-mail.outlook.com' + # No entries in the reply_to + assert not obj.reply_to + + assert mock_smtp.call_count == 0 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 0 + assert obj.notify("test") is True + assert mock_smtp.call_count == 1 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 1 + assert response.login.call_count == 1 + assert response.sendmail.call_count == 1 + + user, pw = response.login.call_args[0] + assert pw == 'pass123' + assert user == 'user@hotmail.com' + + mock_smtp.reset_mock() + mock_smtp_ssl.reset_mock() + response.reset_mock() + results = NotifyEmail.parse_url( 'mailtos://user:pass123@outlook.com') obj = Apprise.instantiate(results, suppress_exceptions=False) @@ -974,6 +1013,24 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): # No entries in the reply_to assert not obj.reply_to + assert mock_smtp.call_count == 0 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 0 + assert obj.notify("test") is True + assert mock_smtp.call_count == 1 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 1 + assert response.login.call_count == 1 + assert response.sendmail.call_count == 1 + + user, pw = response.login.call_args[0] + assert pw == 'pass123' + assert user == 'user@outlook.com' + + mock_smtp.reset_mock() + mock_smtp_ssl.reset_mock() + response.reset_mock() + results = NotifyEmail.parse_url( 'mailtos://user:pass123@outlook.com.au') obj = Apprise.instantiate(results, suppress_exceptions=False) @@ -982,6 +1039,24 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): # No entries in the reply_to assert not obj.reply_to + assert mock_smtp.call_count == 0 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 0 + assert obj.notify("test") is True + assert mock_smtp.call_count == 1 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 1 + assert response.login.call_count == 1 + assert response.sendmail.call_count == 1 + + user, pw = response.login.call_args[0] + assert pw == 'pass123' + assert user == 'user@outlook.com.au' + + mock_smtp.reset_mock() + mock_smtp_ssl.reset_mock() + response.reset_mock() + # Consisitency Checks results = NotifyEmail.parse_url( 'mailtos://outlook.com?smtp=smtp.outlook.com' @@ -994,6 +1069,24 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): assert obj1.secure_mode == 'starttls' assert obj1.port == 587 + assert mock_smtp.call_count == 0 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 0 + assert obj1.notify("test") is True + assert mock_smtp.call_count == 1 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 1 + assert response.login.call_count == 1 + assert response.sendmail.call_count == 1 + + user, pw = response.login.call_args[0] + assert pw == 'app.pw' + assert user == 'user@outlook.com' + + mock_smtp.reset_mock() + mock_smtp_ssl.reset_mock() + response.reset_mock() + results = NotifyEmail.parse_url( 'mailtos://user:app.pw@outlook.com') obj2 = Apprise.instantiate(results, suppress_exceptions=False) @@ -1004,6 +1097,24 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): assert obj2.secure_mode == obj1.secure_mode assert obj2.port == obj1.port + assert mock_smtp.call_count == 0 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 0 + assert obj2.notify("test") is True + assert mock_smtp.call_count == 1 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 1 + assert response.login.call_count == 1 + assert response.sendmail.call_count == 1 + + user, pw = response.login.call_args[0] + assert pw == 'app.pw' + assert user == 'user@outlook.com' + + mock_smtp.reset_mock() + mock_smtp_ssl.reset_mock() + response.reset_mock() + results = NotifyEmail.parse_url( 'mailtos://user:pass123@live.com') obj = Apprise.instantiate(results, suppress_exceptions=False) @@ -1011,6 +1122,24 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): # No entries in the reply_to assert not obj.reply_to + assert mock_smtp.call_count == 0 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 0 + assert obj.notify("test") is True + assert mock_smtp.call_count == 1 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 1 + assert response.login.call_count == 1 + assert response.sendmail.call_count == 1 + + user, pw = response.login.call_args[0] + assert pw == 'pass123' + assert user == 'user@live.com' + + mock_smtp.reset_mock() + mock_smtp_ssl.reset_mock() + response.reset_mock() + results = NotifyEmail.parse_url( 'mailtos://user:pass123@hotmail.com') obj = Apprise.instantiate(results, suppress_exceptions=False) @@ -1018,6 +1147,24 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): # No entries in the reply_to assert not obj.reply_to + assert mock_smtp.call_count == 0 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 0 + assert obj.notify("test") is True + assert mock_smtp.call_count == 1 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 1 + assert response.login.call_count == 1 + assert response.sendmail.call_count == 1 + + user, pw = response.login.call_args[0] + assert pw == 'pass123' + assert user == 'user@hotmail.com' + + mock_smtp.reset_mock() + mock_smtp_ssl.reset_mock() + response.reset_mock() + # # Test Port Over-Riding # @@ -1042,9 +1189,28 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): assert re.match(r'.*mode=ssl.*', obj.url()) is not None # No smtp= as the SMTP server is the same as the hostname in this case assert re.match(r'.*smtp=smtp.exmail.qq.com.*', obj.url()) is not None - # URL is assembled based on provided user + # URL is assembled based on provided user (:465 is dropped because it + # is a default port when using xyz.cn) assert re.match( - r'^mailtos://abc:password@xyz.cn:465/.*', obj.url()) is not None + r'^mailtos://abc:password@xyz.cn/.*', obj.url()) is not None + + assert mock_smtp.call_count == 0 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 0 + assert obj.notify("test") is True + assert mock_smtp.call_count == 0 + assert mock_smtp_ssl.call_count == 1 + assert response.starttls.call_count == 0 + assert response.login.call_count == 1 + assert response.sendmail.call_count == 1 + + user, pw = response.login.call_args[0] + assert pw == 'password' + assert user == 'abc' + + mock_smtp.reset_mock() + mock_smtp_ssl.reset_mock() + response.reset_mock() results = NotifyEmail.parse_url( "mailtos://abc:password@xyz.cn?" @@ -1061,6 +1227,24 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): # No entries in the reply_to assert not obj.reply_to + assert mock_smtp.call_count == 0 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 0 + assert obj.notify("test") is True + assert mock_smtp.call_count == 0 + assert mock_smtp_ssl.call_count == 1 + assert response.starttls.call_count == 0 + assert response.login.call_count == 1 + assert response.sendmail.call_count == 1 + + user, pw = response.login.call_args[0] + assert pw == 'password' + assert user == 'abc' + + mock_smtp.reset_mock() + mock_smtp_ssl.reset_mock() + response.reset_mock() + # # Test Reply-To Email # @@ -1078,6 +1262,24 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): # Test that our template over-ride worked assert 'reply=noreply%40example.com' in obj.url() + assert mock_smtp.call_count == 0 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 0 + assert obj.notify("test") is True + assert mock_smtp.call_count == 1 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 1 + assert response.login.call_count == 1 + assert response.sendmail.call_count == 1 + + user, pw = response.login.call_args[0] + assert pw == 'pass' + assert user == 'user' + + mock_smtp.reset_mock() + mock_smtp_ssl.reset_mock() + response.reset_mock() + # # Test Reply-To Email with Name Inline # @@ -1094,3 +1296,67 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): 'mailtos://user:pass@example.com') # Test that our template over-ride worked assert 'reply=Chris+%3Cnoreply%40example.ca%3E' in obj.url() + + assert mock_smtp.call_count == 0 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 0 + assert obj.notify("test") is True + assert mock_smtp.call_count == 1 + assert mock_smtp_ssl.call_count == 0 + assert response.starttls.call_count == 1 + assert response.login.call_count == 1 + assert response.sendmail.call_count == 1 + + user, pw = response.login.call_args[0] + assert pw == 'pass' + assert user == 'user' + + mock_smtp.reset_mock() + mock_smtp_ssl.reset_mock() + response.reset_mock() + + # Fast Mail Handling + + # Test variations of username required to be an email address + # user@example.com; we also test an over-ride port on a template driven + # mailto:// entry + results = NotifyEmail.parse_url( + 'mailto://fastmail.com/?to=hello@concordium-explorer.nl' + '&user=joe@mydomain.nl&pass=abc123' + '&from=Concordium Explorer Bot') + assert isinstance(results, dict) + assert 'Concordium Explorer Bot' == \ + results['from_addr'] + assert 'joe@mydomain.nl' == results['user'] + assert results['port'] is None + assert 'fastmail.com' == results['host'] + assert 'abc123' == results['password'] + assert 'hello@concordium-explorer.nl' in results['targets'] + + obj = Apprise.instantiate(results, suppress_exceptions=False) + assert isinstance(obj, NotifyEmail) is True + + assert mock_smtp.call_count == 0 + assert mock_smtp_ssl.call_count == 0 + assert obj.notify("test") is True + assert mock_smtp.call_count == 0 + assert mock_smtp_ssl.call_count == 1 + assert response.starttls.call_count == 0 + assert response.login.call_count == 1 + assert response.sendmail.call_count == 1 + # Store our Sent Arguments + # Syntax is: + # sendmail(from_addr, to_addrs, msg, mail_options=(), rcpt_options=()) + # [0] [1] [2] + _from = response.sendmail.call_args[0][0] + _to = response.sendmail.call_args[0][1] + _msg = response.sendmail.call_args[0][2] + assert _from == 'bot@concordium-explorer.nl' + assert isinstance(_to, list) + assert len(_to) == 1 + assert _to[0] == 'hello@concordium-explorer.nl' + assert _msg.split('\n')[-3] == 'test' + + user, pw = response.login.call_args[0] + assert pw == 'abc123' + assert user == 'joe@mydomain.nl'