Refactored SSL/Auth handling of Emails (#774)

This commit is contained in:
Chris Caron 2022-11-23 07:13:38 -05:00 committed by GitHub
parent eb85dca076
commit 6fb8fbab19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 305 additions and 35 deletions

View File

@ -63,15 +63,23 @@ class WebBaseLogin:
# Secure Email Modes # Secure Email Modes
class SecureMailMode: class SecureMailMode:
INSECURE = "insecure"
SSL = "ssl" SSL = "ssl"
STARTTLS = "starttls" STARTTLS = "starttls"
# Define all of the secure modes (used during validation) # Define all of the secure modes (used during validation)
SECURE_MODES = ( SECURE_MODES = {
SecureMailMode.SSL, SecureMailMode.STARTTLS: {
SecureMailMode.STARTTLS, 'default_port': 587,
) },
SecureMailMode.SSL: {
'default_port': 465,
},
SecureMailMode.INSECURE: {
'default_port': 25,
},
}
# To attempt to make this script stupid proof, if we detect an email address # To attempt to make this script stupid proof, if we detect an email address
# that is part of the this table, we can pre-use a lot more defaults if they # that is part of the this table, we can pre-use a lot more defaults if they
@ -328,15 +336,6 @@ class NotifyEmail(NotifyBase):
# Default Notify Format # Default Notify Format
notify_format = NotifyFormat.HTML notify_format = NotifyFormat.HTML
# Default Non-Encryption Port
default_port = 25
# Default Secure Port
default_secure_port = 587
# Default Secure Mode
default_secure_mode = SecureMailMode.STARTTLS
# Default SMTP Timeout (in seconds) # Default SMTP Timeout (in seconds)
socket_connect_timeout = 15 socket_connect_timeout = 15
@ -446,14 +445,6 @@ class NotifyEmail(NotifyBase):
""" """
super().__init__(**kwargs) super().__init__(**kwargs)
# Handle SMTP vs SMTPS (Secure vs UnSecure)
if not self.port:
if self.secure:
self.port = self.default_secure_port
else:
self.port = self.default_port
# Acquire Email 'To' # Acquire Email 'To'
self.targets = list() self.targets = list()
@ -511,9 +502,14 @@ class NotifyEmail(NotifyBase):
smtp_host if isinstance(smtp_host, str) else '' smtp_host if isinstance(smtp_host, str) else ''
# Now detect secure mode # Now detect secure mode
self.secure_mode = self.default_secure_mode \ if secure_mode:
if not isinstance(secure_mode, str) \ self.secure_mode = None \
else secure_mode.lower() if not isinstance(secure_mode, str) \
else secure_mode.lower()
else:
self.secure_mode = SecureMailMode.INSECURE \
if not self.secure else self.template_args['mode']['default']
if self.secure_mode not in SECURE_MODES: if self.secure_mode not in SECURE_MODES:
msg = 'The secure mode specified ({}) is invalid.'\ msg = 'The secure mode specified ({}) is invalid.'\
.format(secure_mode) .format(secure_mode)
@ -590,6 +586,15 @@ class NotifyEmail(NotifyBase):
# Apply any defaults based on certain known configurations # Apply any defaults based on certain known configurations
self.NotifyEmailDefaults(secure_mode=secure_mode, **kwargs) self.NotifyEmailDefaults(secure_mode=secure_mode, **kwargs)
if not self.secure and self.secure_mode != SecureMailMode.INSECURE:
# Enable Secure mode if not otherwise set
self.secure = True
if not self.port:
# Assign our port based on our secure_mode if not otherwise
# detected
self.port = SECURE_MODES[self.secure_mode]['default_port']
# if there is still no smtp_host then we fall back to the hostname # if there is still no smtp_host then we fall back to the hostname
if not self.smtp_host: if not self.smtp_host:
self.smtp_host = self.host self.smtp_host = self.host
@ -653,11 +658,11 @@ class NotifyEmail(NotifyBase):
if login_type: if login_type:
# only apply additional logic to our user if a login_type # only apply additional logic to our user if a login_type
# was specified. # was specified.
if is_email(self.user) and \ if is_email(self.user):
WebBaseLogin.EMAIL not in login_type: if WebBaseLogin.EMAIL not in login_type:
# Email specified but login type # Email specified but login type
# not supported; switch it to user id # not supported; switch it to user id
self.user = match.group('id') self.user = match.group('id')
elif WebBaseLogin.USERID not in login_type: elif WebBaseLogin.USERID not in login_type:
# user specified but login type # user specified but login type
@ -824,7 +829,7 @@ class NotifyEmail(NotifyBase):
try: try:
self.logger.debug('Connecting to remote SMTP server...') self.logger.debug('Connecting to remote SMTP server...')
socket_func = smtplib.SMTP socket_func = smtplib.SMTP
if self.secure and self.secure_mode == SecureMailMode.SSL: if self.secure_mode == SecureMailMode.SSL:
self.logger.debug('Securing connection with SSL...') self.logger.debug('Securing connection with SSL...')
socket_func = smtplib.SMTP_SSL socket_func = smtplib.SMTP_SSL
@ -835,7 +840,7 @@ class NotifyEmail(NotifyBase):
timeout=self.socket_connect_timeout, timeout=self.socket_connect_timeout,
) )
if self.secure and self.secure_mode == SecureMailMode.STARTTLS: if self.secure_mode == SecureMailMode.STARTTLS:
# Handle Secure Connections # Handle Secure Connections
self.logger.debug('Securing connection with STARTTLS...') self.logger.debug('Securing connection with STARTTLS...')
socket.starttls() socket.starttls()
@ -965,8 +970,7 @@ class NotifyEmail(NotifyBase):
) )
# Default Port setup # Default Port setup
default_port = \ default_port = SECURE_MODES[self.secure_mode]['default_port']
self.default_secure_port if self.secure else self.default_port
# a simple boolean check as to whether we display our target emails # a simple boolean check as to whether we display our target emails
# or not # or not

View File

@ -105,6 +105,12 @@ TEST_URLS = (
('mailtos://user:pass@nuxref.com:567', { ('mailtos://user:pass@nuxref.com:567', {
'instance': NotifyEmail, 'instance': NotifyEmail,
}), }),
('mailto://user:pass@nuxref.com?mode=ssl', {
# mailto:// with mode=ssl causes us to convert to ssl
'instance': NotifyEmail,
# Our expected url(privacy=True) startswith() response:
'privacy_url': 'mailtos://user:****@nuxref.com',
}),
('mailto://user:pass@nuxref.com:567?format=html', { ('mailto://user:pass@nuxref.com:567?format=html', {
'instance': NotifyEmail, 'instance': NotifyEmail,
}), }),
@ -913,6 +919,7 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl):
assert 'smtp=smtp-mail.outlook.com' in obj.url() assert 'smtp=smtp-mail.outlook.com' in obj.url()
mock_smtp.reset_mock() mock_smtp.reset_mock()
mock_smtp_ssl.reset_mock()
response.reset_mock() response.reset_mock()
# The below switches the `name` with the `to` to verify the results # The below switches the `name` with the `to` to verify the results
@ -952,6 +959,11 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl):
assert _to[0] == 'user2@yahoo.com' assert _to[0] == 'user2@yahoo.com'
assert _msg.split('\n')[-3] == 'test' assert _msg.split('\n')[-3] == 'test'
user, pw = response.login.call_args[0]
# the SMTP Server was ovr
assert pw == 'pass123'
assert user == 'user'
assert obj.url().startswith( assert obj.url().startswith(
'mailtos://user:pass123@hotmail.com/user2%40yahoo.com') 'mailtos://user:pass123@hotmail.com/user2%40yahoo.com')
# Test that our template over-ride worked # Test that our template over-ride worked
@ -961,11 +973,38 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl):
assert 'reply=' not in obj.url() assert 'reply=' not in obj.url()
mock_smtp.reset_mock() mock_smtp.reset_mock()
mock_smtp_ssl.reset_mock()
response.reset_mock() response.reset_mock()
# #
# Test outlook/hotmail lookups # Test outlook/hotmail lookups
# #
results = NotifyEmail.parse_url(
'mailtos://user:pass123@hotmail.com')
obj = Apprise.instantiate(results, suppress_exceptions=False)
assert isinstance(obj, NotifyEmail) is True
assert obj.smtp_host == 'smtp-mail.outlook.com'
# No entries in the reply_to
assert not obj.reply_to
assert mock_smtp.call_count == 0
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 0
assert obj.notify("test") is True
assert mock_smtp.call_count == 1
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 1
assert response.login.call_count == 1
assert response.sendmail.call_count == 1
user, pw = response.login.call_args[0]
assert pw == 'pass123'
assert user == 'user@hotmail.com'
mock_smtp.reset_mock()
mock_smtp_ssl.reset_mock()
response.reset_mock()
results = NotifyEmail.parse_url( results = NotifyEmail.parse_url(
'mailtos://user:pass123@outlook.com') 'mailtos://user:pass123@outlook.com')
obj = Apprise.instantiate(results, suppress_exceptions=False) obj = Apprise.instantiate(results, suppress_exceptions=False)
@ -974,6 +1013,24 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl):
# No entries in the reply_to # No entries in the reply_to
assert not obj.reply_to assert not obj.reply_to
assert mock_smtp.call_count == 0
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 0
assert obj.notify("test") is True
assert mock_smtp.call_count == 1
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 1
assert response.login.call_count == 1
assert response.sendmail.call_count == 1
user, pw = response.login.call_args[0]
assert pw == 'pass123'
assert user == 'user@outlook.com'
mock_smtp.reset_mock()
mock_smtp_ssl.reset_mock()
response.reset_mock()
results = NotifyEmail.parse_url( results = NotifyEmail.parse_url(
'mailtos://user:pass123@outlook.com.au') 'mailtos://user:pass123@outlook.com.au')
obj = Apprise.instantiate(results, suppress_exceptions=False) obj = Apprise.instantiate(results, suppress_exceptions=False)
@ -982,6 +1039,24 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl):
# No entries in the reply_to # No entries in the reply_to
assert not obj.reply_to assert not obj.reply_to
assert mock_smtp.call_count == 0
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 0
assert obj.notify("test") is True
assert mock_smtp.call_count == 1
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 1
assert response.login.call_count == 1
assert response.sendmail.call_count == 1
user, pw = response.login.call_args[0]
assert pw == 'pass123'
assert user == 'user@outlook.com.au'
mock_smtp.reset_mock()
mock_smtp_ssl.reset_mock()
response.reset_mock()
# Consisitency Checks # Consisitency Checks
results = NotifyEmail.parse_url( results = NotifyEmail.parse_url(
'mailtos://outlook.com?smtp=smtp.outlook.com' 'mailtos://outlook.com?smtp=smtp.outlook.com'
@ -994,6 +1069,24 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl):
assert obj1.secure_mode == 'starttls' assert obj1.secure_mode == 'starttls'
assert obj1.port == 587 assert obj1.port == 587
assert mock_smtp.call_count == 0
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 0
assert obj1.notify("test") is True
assert mock_smtp.call_count == 1
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 1
assert response.login.call_count == 1
assert response.sendmail.call_count == 1
user, pw = response.login.call_args[0]
assert pw == 'app.pw'
assert user == 'user@outlook.com'
mock_smtp.reset_mock()
mock_smtp_ssl.reset_mock()
response.reset_mock()
results = NotifyEmail.parse_url( results = NotifyEmail.parse_url(
'mailtos://user:app.pw@outlook.com') 'mailtos://user:app.pw@outlook.com')
obj2 = Apprise.instantiate(results, suppress_exceptions=False) obj2 = Apprise.instantiate(results, suppress_exceptions=False)
@ -1004,6 +1097,24 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl):
assert obj2.secure_mode == obj1.secure_mode assert obj2.secure_mode == obj1.secure_mode
assert obj2.port == obj1.port assert obj2.port == obj1.port
assert mock_smtp.call_count == 0
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 0
assert obj2.notify("test") is True
assert mock_smtp.call_count == 1
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 1
assert response.login.call_count == 1
assert response.sendmail.call_count == 1
user, pw = response.login.call_args[0]
assert pw == 'app.pw'
assert user == 'user@outlook.com'
mock_smtp.reset_mock()
mock_smtp_ssl.reset_mock()
response.reset_mock()
results = NotifyEmail.parse_url( results = NotifyEmail.parse_url(
'mailtos://user:pass123@live.com') 'mailtos://user:pass123@live.com')
obj = Apprise.instantiate(results, suppress_exceptions=False) obj = Apprise.instantiate(results, suppress_exceptions=False)
@ -1011,6 +1122,24 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl):
# No entries in the reply_to # No entries in the reply_to
assert not obj.reply_to assert not obj.reply_to
assert mock_smtp.call_count == 0
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 0
assert obj.notify("test") is True
assert mock_smtp.call_count == 1
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 1
assert response.login.call_count == 1
assert response.sendmail.call_count == 1
user, pw = response.login.call_args[0]
assert pw == 'pass123'
assert user == 'user@live.com'
mock_smtp.reset_mock()
mock_smtp_ssl.reset_mock()
response.reset_mock()
results = NotifyEmail.parse_url( results = NotifyEmail.parse_url(
'mailtos://user:pass123@hotmail.com') 'mailtos://user:pass123@hotmail.com')
obj = Apprise.instantiate(results, suppress_exceptions=False) obj = Apprise.instantiate(results, suppress_exceptions=False)
@ -1018,6 +1147,24 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl):
# No entries in the reply_to # No entries in the reply_to
assert not obj.reply_to assert not obj.reply_to
assert mock_smtp.call_count == 0
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 0
assert obj.notify("test") is True
assert mock_smtp.call_count == 1
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 1
assert response.login.call_count == 1
assert response.sendmail.call_count == 1
user, pw = response.login.call_args[0]
assert pw == 'pass123'
assert user == 'user@hotmail.com'
mock_smtp.reset_mock()
mock_smtp_ssl.reset_mock()
response.reset_mock()
# #
# Test Port Over-Riding # Test Port Over-Riding
# #
@ -1042,9 +1189,28 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl):
assert re.match(r'.*mode=ssl.*', obj.url()) is not None assert re.match(r'.*mode=ssl.*', obj.url()) is not None
# No smtp= as the SMTP server is the same as the hostname in this case # No smtp= as the SMTP server is the same as the hostname in this case
assert re.match(r'.*smtp=smtp.exmail.qq.com.*', obj.url()) is not None assert re.match(r'.*smtp=smtp.exmail.qq.com.*', obj.url()) is not None
# URL is assembled based on provided user # URL is assembled based on provided user (:465 is dropped because it
# is a default port when using xyz.cn)
assert re.match( assert re.match(
r'^mailtos://abc:password@xyz.cn:465/.*', obj.url()) is not None r'^mailtos://abc:password@xyz.cn/.*', obj.url()) is not None
assert mock_smtp.call_count == 0
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 0
assert obj.notify("test") is True
assert mock_smtp.call_count == 0
assert mock_smtp_ssl.call_count == 1
assert response.starttls.call_count == 0
assert response.login.call_count == 1
assert response.sendmail.call_count == 1
user, pw = response.login.call_args[0]
assert pw == 'password'
assert user == 'abc'
mock_smtp.reset_mock()
mock_smtp_ssl.reset_mock()
response.reset_mock()
results = NotifyEmail.parse_url( results = NotifyEmail.parse_url(
"mailtos://abc:password@xyz.cn?" "mailtos://abc:password@xyz.cn?"
@ -1061,6 +1227,24 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl):
# No entries in the reply_to # No entries in the reply_to
assert not obj.reply_to assert not obj.reply_to
assert mock_smtp.call_count == 0
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 0
assert obj.notify("test") is True
assert mock_smtp.call_count == 0
assert mock_smtp_ssl.call_count == 1
assert response.starttls.call_count == 0
assert response.login.call_count == 1
assert response.sendmail.call_count == 1
user, pw = response.login.call_args[0]
assert pw == 'password'
assert user == 'abc'
mock_smtp.reset_mock()
mock_smtp_ssl.reset_mock()
response.reset_mock()
# #
# Test Reply-To Email # Test Reply-To Email
# #
@ -1078,6 +1262,24 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl):
# Test that our template over-ride worked # Test that our template over-ride worked
assert 'reply=noreply%40example.com' in obj.url() assert 'reply=noreply%40example.com' in obj.url()
assert mock_smtp.call_count == 0
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 0
assert obj.notify("test") is True
assert mock_smtp.call_count == 1
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 1
assert response.login.call_count == 1
assert response.sendmail.call_count == 1
user, pw = response.login.call_args[0]
assert pw == 'pass'
assert user == 'user'
mock_smtp.reset_mock()
mock_smtp_ssl.reset_mock()
response.reset_mock()
# #
# Test Reply-To Email with Name Inline # Test Reply-To Email with Name Inline
# #
@ -1094,3 +1296,67 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl):
'mailtos://user:pass@example.com') 'mailtos://user:pass@example.com')
# Test that our template over-ride worked # Test that our template over-ride worked
assert 'reply=Chris+%3Cnoreply%40example.ca%3E' in obj.url() assert 'reply=Chris+%3Cnoreply%40example.ca%3E' in obj.url()
assert mock_smtp.call_count == 0
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 0
assert obj.notify("test") is True
assert mock_smtp.call_count == 1
assert mock_smtp_ssl.call_count == 0
assert response.starttls.call_count == 1
assert response.login.call_count == 1
assert response.sendmail.call_count == 1
user, pw = response.login.call_args[0]
assert pw == 'pass'
assert user == 'user'
mock_smtp.reset_mock()
mock_smtp_ssl.reset_mock()
response.reset_mock()
# Fast Mail Handling
# Test variations of username required to be an email address
# user@example.com; we also test an over-ride port on a template driven
# mailto:// entry
results = NotifyEmail.parse_url(
'mailto://fastmail.com/?to=hello@concordium-explorer.nl'
'&user=joe@mydomain.nl&pass=abc123'
'&from=Concordium Explorer Bot<bot@concordium-explorer.nl>')
assert isinstance(results, dict)
assert 'Concordium Explorer Bot<bot@concordium-explorer.nl>' == \
results['from_addr']
assert 'joe@mydomain.nl' == results['user']
assert results['port'] is None
assert 'fastmail.com' == results['host']
assert 'abc123' == results['password']
assert 'hello@concordium-explorer.nl' in results['targets']
obj = Apprise.instantiate(results, suppress_exceptions=False)
assert isinstance(obj, NotifyEmail) is True
assert mock_smtp.call_count == 0
assert mock_smtp_ssl.call_count == 0
assert obj.notify("test") is True
assert mock_smtp.call_count == 0
assert mock_smtp_ssl.call_count == 1
assert response.starttls.call_count == 0
assert response.login.call_count == 1
assert response.sendmail.call_count == 1
# Store our Sent Arguments
# Syntax is:
# sendmail(from_addr, to_addrs, msg, mail_options=(), rcpt_options=())
# [0] [1] [2]
_from = response.sendmail.call_args[0][0]
_to = response.sendmail.call_args[0][1]
_msg = response.sendmail.call_args[0][2]
assert _from == 'bot@concordium-explorer.nl'
assert isinstance(_to, list)
assert len(_to) == 1
assert _to[0] == 'hello@concordium-explorer.nl'
assert _msg.split('\n')[-3] == 'test'
user, pw = response.login.call_args[0]
assert pw == 'abc123'
assert user == 'joe@mydomain.nl'