mirror of
https://github.com/caronc/apprise.git
synced 2025-01-05 21:49:02 +01:00
Better custom email detection for from_addr
and to
if not specified (#1066)
This commit is contained in:
parent
010be0bfda
commit
5ae212fbaf
@ -496,34 +496,6 @@ class NotifyEmail(NotifyBase):
|
||||
# addresses from the URL provided
|
||||
self.from_addr = [False, '']
|
||||
|
||||
if self.user and self.host:
|
||||
# Prepare the bases of our email
|
||||
self.from_addr = [self.app_id, '{}@{}'.format(
|
||||
re.split(r'[\s@]+', self.user)[0],
|
||||
self.host,
|
||||
)]
|
||||
|
||||
if from_addr:
|
||||
result = is_email(from_addr)
|
||||
if result:
|
||||
self.from_addr = (
|
||||
result['name'] if result['name'] else False,
|
||||
result['full_email'])
|
||||
else:
|
||||
self.from_addr[0] = from_addr
|
||||
|
||||
result = is_email(self.from_addr[1])
|
||||
if not result:
|
||||
# Parse Source domain based on from_addr
|
||||
msg = 'Invalid ~From~ email specified: {}'.format(
|
||||
'{} <{}>'.format(self.from_addr[0], self.from_addr[1])
|
||||
if self.from_addr[0] else '{}'.format(self.from_addr[1]))
|
||||
self.logger.warning(msg)
|
||||
raise TypeError(msg)
|
||||
|
||||
# Store our lookup
|
||||
self.names[self.from_addr[1]] = self.from_addr[0]
|
||||
|
||||
# Now detect the SMTP Server
|
||||
self.smtp_host = \
|
||||
smtp_host if isinstance(smtp_host, str) else ''
|
||||
@ -543,25 +515,6 @@ class NotifyEmail(NotifyBase):
|
||||
self.logger.warning(msg)
|
||||
raise TypeError(msg)
|
||||
|
||||
if targets:
|
||||
# Validate recipients (to:) and drop bad ones:
|
||||
for recipient in parse_emails(targets):
|
||||
result = is_email(recipient)
|
||||
if result:
|
||||
self.targets.append(
|
||||
(result['name'] if result['name'] else False,
|
||||
result['full_email']))
|
||||
continue
|
||||
|
||||
self.logger.warning(
|
||||
'Dropped invalid To email '
|
||||
'({}) specified.'.format(recipient),
|
||||
)
|
||||
|
||||
else:
|
||||
# If our target email list is empty we want to add ourselves to it
|
||||
self.targets.append((False, self.from_addr[1]))
|
||||
|
||||
# Validate recipients (cc:) and drop bad ones:
|
||||
for recipient in parse_emails(cc):
|
||||
email = is_email(recipient)
|
||||
@ -613,6 +566,54 @@ class NotifyEmail(NotifyBase):
|
||||
# Apply any defaults based on certain known configurations
|
||||
self.NotifyEmailDefaults(secure_mode=secure_mode, **kwargs)
|
||||
|
||||
if self.user and self.host:
|
||||
# Prepare the bases of our email
|
||||
self.from_addr = [self.app_id, '{}@{}'.format(
|
||||
re.split(r'[\s@]+', self.user)[0],
|
||||
self.host,
|
||||
)]
|
||||
|
||||
if from_addr:
|
||||
result = is_email(from_addr)
|
||||
if result:
|
||||
self.from_addr = (
|
||||
result['name'] if result['name'] else False,
|
||||
result['full_email'])
|
||||
else:
|
||||
# Only update the string but use the already detected info
|
||||
self.from_addr[0] = from_addr
|
||||
|
||||
result = is_email(self.from_addr[1])
|
||||
if not result:
|
||||
# Parse Source domain based on from_addr
|
||||
msg = 'Invalid ~From~ email specified: {}'.format(
|
||||
'{} <{}>'.format(self.from_addr[0], self.from_addr[1])
|
||||
if self.from_addr[0] else '{}'.format(self.from_addr[1]))
|
||||
self.logger.warning(msg)
|
||||
raise TypeError(msg)
|
||||
|
||||
# Store our lookup
|
||||
self.names[self.from_addr[1]] = self.from_addr[0]
|
||||
|
||||
if targets:
|
||||
# Validate recipients (to:) and drop bad ones:
|
||||
for recipient in parse_emails(targets):
|
||||
result = is_email(recipient)
|
||||
if result:
|
||||
self.targets.append(
|
||||
(result['name'] if result['name'] else False,
|
||||
result['full_email']))
|
||||
continue
|
||||
|
||||
self.logger.warning(
|
||||
'Dropped invalid To email '
|
||||
'({}) specified.'.format(recipient),
|
||||
)
|
||||
|
||||
else:
|
||||
# If our target email list is empty we want to add ourselves to it
|
||||
self.targets.append((False, self.from_addr[1]))
|
||||
|
||||
if not self.secure and self.secure_mode != SecureMailMode.INSECURE:
|
||||
# Enable Secure mode if not otherwise set
|
||||
self.secure = True
|
||||
@ -679,9 +680,7 @@ class NotifyEmail(NotifyBase):
|
||||
# was specified, then we default to having them all set (which
|
||||
# basically implies that there are no restrictions and use use
|
||||
# whatever was specified)
|
||||
login_type = EMAIL_TEMPLATES[i][2]\
|
||||
.get('login_type', [])
|
||||
|
||||
login_type = EMAIL_TEMPLATES[i][2].get('login_type', [])
|
||||
if login_type:
|
||||
# only apply additional logic to our user if a login_type
|
||||
# was specified.
|
||||
@ -691,6 +690,10 @@ class NotifyEmail(NotifyBase):
|
||||
# not supported; switch it to user id
|
||||
self.user = match.group('id')
|
||||
|
||||
else:
|
||||
# Enforce our host information
|
||||
self.host = self.user.split('@')[1]
|
||||
|
||||
elif WebBaseLogin.USERID not in login_type:
|
||||
# user specified but login type
|
||||
# not supported; switch it to email
|
||||
|
@ -1421,6 +1421,113 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl):
|
||||
mock_smtp_ssl.reset_mock()
|
||||
response.reset_mock()
|
||||
|
||||
# Issue github.com/caronc/apprise/issue/1040
|
||||
# mailto://fastmail.com?user=username@customdomain.com \
|
||||
# &to=username@customdomain.com&pass=password123
|
||||
#
|
||||
# should just have to be written like (to= omitted)
|
||||
# mailto://fastmail.com?user=username@customdomain.com&pass=password123
|
||||
#
|
||||
results = NotifyEmail.parse_url(
|
||||
'mailto://fastmail.com?user=username@customdomain.com'
|
||||
'&pass=password123')
|
||||
assert isinstance(results, dict)
|
||||
assert 'username@customdomain.com' == results['user']
|
||||
assert results['from_addr'] == ''
|
||||
assert results['port'] is None
|
||||
assert 'fastmail.com' == results['host']
|
||||
assert 'password123' == results['password']
|
||||
assert results['smtp_host'] == ''
|
||||
|
||||
obj = Apprise.instantiate(results, suppress_exceptions=False)
|
||||
assert isinstance(obj, NotifyEmail) is True
|
||||
# During instantiation, our variables get detected
|
||||
assert obj.smtp_host == 'smtp.fastmail.com'
|
||||
assert obj.from_addr == ['Apprise', 'username@customdomain.com']
|
||||
assert obj.host == 'customdomain.com'
|
||||
# detected from
|
||||
assert (False, 'username@customdomain.com') in obj.targets
|
||||
|
||||
assert mock_smtp.call_count == 0
|
||||
assert mock_smtp_ssl.call_count == 0
|
||||
assert obj.notify("test") is True
|
||||
assert mock_smtp.call_count == 0
|
||||
assert mock_smtp_ssl.call_count == 1
|
||||
assert response.starttls.call_count == 0
|
||||
assert response.login.call_count == 1
|
||||
assert response.sendmail.call_count == 1
|
||||
# Store our Sent Arguments
|
||||
# Syntax is:
|
||||
# sendmail(from_addr, to_addrs, msg, mail_options=(), rcpt_options=())
|
||||
# [0] [1] [2]
|
||||
_from = response.sendmail.call_args[0][0]
|
||||
_to = response.sendmail.call_args[0][1]
|
||||
_msg = response.sendmail.call_args[0][2]
|
||||
assert _from == 'username@customdomain.com'
|
||||
assert isinstance(_to, list)
|
||||
assert len(_to) == 1
|
||||
assert _to[0] == 'username@customdomain.com'
|
||||
assert _msg.split('\n')[-3] == 'test'
|
||||
|
||||
user, pw = response.login.call_args[0]
|
||||
assert pw == 'password123'
|
||||
assert user == 'username@customdomain.com'
|
||||
|
||||
mock_smtp.reset_mock()
|
||||
mock_smtp_ssl.reset_mock()
|
||||
response.reset_mock()
|
||||
|
||||
# Similar test as above, just showing that we can over-ride the From=
|
||||
# with these custom URLs as well and not require a full email
|
||||
results = NotifyEmail.parse_url(
|
||||
'mailto://fastmail.com?user=username@customdomain.com'
|
||||
'&pass=password123&from=Custom')
|
||||
assert isinstance(results, dict)
|
||||
assert 'username@customdomain.com' == results['user']
|
||||
assert results['from_addr'] == 'Custom'
|
||||
assert results['port'] is None
|
||||
assert 'fastmail.com' == results['host']
|
||||
assert 'password123' == results['password']
|
||||
assert results['smtp_host'] == ''
|
||||
|
||||
obj = Apprise.instantiate(results, suppress_exceptions=False)
|
||||
assert isinstance(obj, NotifyEmail) is True
|
||||
# During instantiation, our variables get detected
|
||||
assert obj.smtp_host == 'smtp.fastmail.com'
|
||||
assert obj.from_addr == ['Custom', 'username@customdomain.com']
|
||||
assert obj.host == 'customdomain.com'
|
||||
# detected from
|
||||
assert (False, 'username@customdomain.com') in obj.targets
|
||||
|
||||
assert mock_smtp.call_count == 0
|
||||
assert mock_smtp_ssl.call_count == 0
|
||||
assert obj.notify("test") is True
|
||||
assert mock_smtp.call_count == 0
|
||||
assert mock_smtp_ssl.call_count == 1
|
||||
assert response.starttls.call_count == 0
|
||||
assert response.login.call_count == 1
|
||||
assert response.sendmail.call_count == 1
|
||||
# Store our Sent Arguments
|
||||
# Syntax is:
|
||||
# sendmail(from_addr, to_addrs, msg, mail_options=(), rcpt_options=())
|
||||
# [0] [1] [2]
|
||||
_from = response.sendmail.call_args[0][0]
|
||||
_to = response.sendmail.call_args[0][1]
|
||||
_msg = response.sendmail.call_args[0][2]
|
||||
assert _from == 'username@customdomain.com'
|
||||
assert isinstance(_to, list)
|
||||
assert len(_to) == 1
|
||||
assert _to[0] == 'username@customdomain.com'
|
||||
assert _msg.split('\n')[-3] == 'test'
|
||||
|
||||
user, pw = response.login.call_args[0]
|
||||
assert pw == 'password123'
|
||||
assert user == 'username@customdomain.com'
|
||||
|
||||
mock_smtp.reset_mock()
|
||||
mock_smtp_ssl.reset_mock()
|
||||
response.reset_mock()
|
||||
|
||||
# Issue github.com/caronc/apprise/issue/941
|
||||
|
||||
# mail domain = mail-domain.com
|
||||
@ -1502,6 +1609,9 @@ def test_plugin_email_plus_in_toemail(mock_smtp, mock_smtp_ssl):
|
||||
|
||||
assert len(obj.targets) == 1
|
||||
assert ('Plus Support', 'test+notification@gmail.com') in obj.targets
|
||||
assert obj.smtp_host == 'smtp.gmail.com'
|
||||
assert obj.from_addr == ['Apprise', 'user@gmail.com']
|
||||
assert obj.host == 'gmail.com'
|
||||
|
||||
assert mock_smtp.call_count == 0
|
||||
assert mock_smtp_ssl.call_count == 0
|
||||
|
Loading…
Reference in New Issue
Block a user