Advanced email parsing added; eg: Full Name email@domain.com (#276)

This commit is contained in:
Chris Caron 2020-08-18 14:05:29 -04:00 committed by GitHub
parent fbaf1e4059
commit 6e1b8a0bd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 718 additions and 165 deletions

View File

@ -33,7 +33,7 @@ from .common import NotifyFormat
from .common import MATCH_ALL_TAG
from .utils import is_exclusive_match
from .utils import parse_list
from .utils import split_urls
from .utils import parse_urls
from .logger import logger
from .AppriseAsset import AppriseAsset
@ -197,7 +197,7 @@ class Apprise(object):
if isinstance(servers, six.string_types):
# build our server list
servers = split_urls(servers)
servers = parse_urls(servers)
if len(servers) == 0:
return False

View File

@ -41,8 +41,7 @@ from ..URLBase import PrivacyMode
from ..common import NotifyFormat
from ..common import NotifyType
from ..utils import is_email
from ..utils import parse_list
from ..utils import GET_EMAIL_RE
from ..utils import parse_emails
from ..AppriseLocale import gettext_lazy as _
# Globally Default encoding mode set to Quoted Printable.
@ -401,8 +400,8 @@ class NotifyEmail(NotifyBase):
except (ValueError, TypeError):
self.timeout = self.connect_timeout
# Acquire targets
self.targets = parse_list(targets)
# Acquire Email 'To'
self.targets = list()
# Acquire Carbon Copies
self.cc = set()
@ -410,9 +409,11 @@ class NotifyEmail(NotifyBase):
# Acquire Blind Carbon Copies
self.bcc = set()
# For tracking our email -> name lookups
self.names = {}
# Now we want to construct the To and From email
# addresses from the URL provided
self.from_name = from_name
self.from_addr = from_addr
if self.user and not self.from_addr:
@ -422,15 +423,18 @@ class NotifyEmail(NotifyBase):
self.host,
)
if not is_email(self.from_addr):
result = is_email(self.from_addr)
if not result:
# Parse Source domain based on from_addr
msg = 'Invalid ~From~ email specified: {}'.format(self.from_addr)
self.logger.warning(msg)
raise TypeError(msg)
# If our target email list is empty we want to add ourselves to it
if len(self.targets) == 0:
self.targets.append(self.from_addr)
# Store our email address
self.from_addr = result['full_email']
# Set our from name
self.from_name = from_name if from_name else result['name']
# Now detect the SMTP Server
self.smtp_host = \
@ -446,11 +450,35 @@ class NotifyEmail(NotifyBase):
self.logger.warning(msg)
raise TypeError(msg)
# Validate recipients (cc:) and drop bad ones:
for recipient in parse_list(cc):
if targets:
# Validate recipients (to:) and drop bad ones:
for recipient in parse_emails(targets):
result = is_email(recipient)
if result:
self.targets.append(
(result['name'] if result['name'] else False,
result['full_email']))
continue
if GET_EMAIL_RE.match(recipient):
self.cc.add(recipient)
self.logger.warning(
'Dropped invalid To email '
'({}) specified.'.format(recipient),
)
else:
# If our target email list is empty we want to add ourselves to it
self.targets.append(
(self.from_name if self.from_name else False, self.from_addr))
# Validate recipients (cc:) and drop bad ones:
for recipient in parse_emails(cc):
email = is_email(recipient)
if email:
self.cc.add(email['full_email'])
# Index our name (if one exists)
self.names[email['full_email']] = \
email['name'] if email['name'] else False
continue
self.logger.warning(
@ -459,10 +487,14 @@ class NotifyEmail(NotifyBase):
)
# Validate recipients (bcc:) and drop bad ones:
for recipient in parse_list(bcc):
for recipient in parse_emails(bcc):
email = is_email(recipient)
if email:
self.bcc.add(email['full_email'])
if GET_EMAIL_RE.match(recipient):
self.bcc.add(recipient)
# Index our name (if one exists)
self.names[email['full_email']] = \
email['name'] if email['name'] else False
continue
self.logger.warning(
@ -556,29 +588,51 @@ class NotifyEmail(NotifyBase):
# error tracking (used for function return)
has_error = False
if not self.targets:
# There is no one to email; we're done
self.logger.warning(
'There are no Email recipients to notify')
return False
# Create a copy of the targets list
emails = list(self.targets)
while len(emails):
# Get our email to notify
to_addr = emails.pop(0)
if not is_email(to_addr):
self.logger.warning(
'Invalid ~To~ email specified: {}'.format(to_addr))
has_error = True
continue
to_name, to_addr = emails.pop(0)
# Strip target out of cc list if in To or Bcc
cc = (self.cc - self.bcc - set([to_addr]))
# Strip target out of bcc list if in To
bcc = (self.bcc - set([to_addr]))
try:
# Format our cc addresses to support the Name field
cc = [formataddr(
(self.names.get(addr, False), addr), charset='utf-8')
for addr in cc]
# Format our bcc addresses to support the Name field
bcc = [formataddr(
(self.names.get(addr, False), addr), charset='utf-8')
for addr in bcc]
except TypeError:
# Python v2.x Support (no charset keyword)
# Format our cc addresses to support the Name field
cc = [formataddr(
(self.names.get(addr, False), addr)) for addr in cc]
# Format our bcc addresses to support the Name field
bcc = [formataddr(
(self.names.get(addr, False), addr)) for addr in bcc]
self.logger.debug(
'Email From: {} <{}>'.format(from_name, self.from_addr))
self.logger.debug('Email To: {}'.format(to_addr))
if len(cc):
if cc:
self.logger.debug('Email Cc: {}'.format(', '.join(cc)))
if len(bcc):
if bcc:
self.logger.debug('Email Bcc: {}'.format(', '.join(bcc)))
self.logger.debug('Login ID: {}'.format(self.user))
self.logger.debug(
@ -597,13 +651,13 @@ class NotifyEmail(NotifyBase):
base['From'] = formataddr(
(from_name if from_name else False, self.from_addr),
charset='utf-8')
base['To'] = formataddr((False, to_addr), charset='utf-8')
base['To'] = formataddr((to_name, to_addr), charset='utf-8')
except TypeError:
# Python v2.x Support (no charset keyword)
base['From'] = formataddr(
(from_name if from_name else False, self.from_addr))
base['To'] = formataddr((False, to_addr))
base['To'] = formataddr((to_name, to_addr))
base['Cc'] = ','.join(cc)
base['Date'] = \
@ -706,7 +760,6 @@ class NotifyEmail(NotifyBase):
# Define an URL parameters
params = {
'from': self.from_addr,
'name': self.from_name,
'mode': self.secure_mode,
'smtp': self.smtp_host,
'timeout': self.timeout,
@ -716,13 +769,22 @@ class NotifyEmail(NotifyBase):
# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
if self.from_name:
params['name'] = self.from_name
if len(self.cc) > 0:
# Handle our Carbon Copy Addresses
params['cc'] = ','.join(self.cc)
params['cc'] = ','.join(
['{}{}'.format(
'' if not e not in self.names
else '{}:'.format(self.names[e]), e) for e in self.cc])
if len(self.bcc) > 0:
# Handle our Blind Carbon Copy Addresses
params['bcc'] = ','.join(self.bcc)
params['bcc'] = ','.join(
['{}{}'.format(
'' if not e not in self.names
else '{}:'.format(self.names[e]), e) for e in self.bcc])
# pull email suffix from username (if present)
user = None if not self.user else self.user.split('@')[0]
@ -748,7 +810,8 @@ class NotifyEmail(NotifyBase):
# a simple boolean check as to whether we display our target emails
# or not
has_targets = \
not (len(self.targets) == 1 and self.targets[0] == self.from_addr)
not (len(self.targets) == 1
and self.targets[0][1] == self.from_addr)
return '{schema}://{auth}{hostname}{port}/{targets}?{params}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
@ -758,7 +821,9 @@ class NotifyEmail(NotifyBase):
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
targets='' if not has_targets else '/'.join(
[NotifyEmail.quote(x, safe='') for x in self.targets]),
[NotifyEmail.quote('{}{}'.format(
'' if not e[0] else '{}:'.format(e[0]), e[1]),
safe='') for e in self.targets]),
params=NotifyEmail.urlencode(params),
)
@ -792,8 +857,7 @@ class NotifyEmail(NotifyBase):
# Attempt to detect 'to' email address
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyEmail.parse_list(results['qsd']['to'])
results['targets'].append(results['qsd']['to'])
if 'name' in results['qsd'] and len(results['qsd']['name']):
# Extract from name to associate with from address
@ -814,13 +878,11 @@ class NotifyEmail(NotifyBase):
# Handle Carbon Copy Addresses
if 'cc' in results['qsd'] and len(results['qsd']['cc']):
results['cc'] = \
NotifyEmail.parse_list(results['qsd']['cc'])
results['cc'] = results['qsd']['cc']
# Handle Blind Carbon Copy Addresses
if 'bcc' in results['qsd'] and len(results['qsd']['bcc']):
results['bcc'] = \
NotifyEmail.parse_list(results['qsd']['bcc'])
results['bcc'] = results['qsd']['bcc']
results['from_addr'] = from_addr
results['smtp_host'] = smtp_host

View File

@ -66,7 +66,7 @@ from ..URLBase import PrivacyMode
from ..common import NotifyFormat
from ..common import NotifyType
from ..utils import is_email
from ..utils import parse_list
from ..utils import parse_emails
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
@ -152,6 +152,14 @@ class NotifyOffice365(NotifyBase):
'to': {
'alias_of': 'targets',
},
'cc': {
'name': _('Carbon Copy'),
'type': 'list:string',
},
'bcc': {
'name': _('Blind Carbon Copy'),
'type': 'list:string',
},
'oauth_id': {
'alias_of': 'client_id',
},
@ -161,7 +169,7 @@ class NotifyOffice365(NotifyBase):
})
def __init__(self, tenant, email, client_id, secret,
targets=None, **kwargs):
targets=None, cc=None, bcc=None, **kwargs):
"""
Initialize Office 365 Object
"""
@ -176,12 +184,15 @@ class NotifyOffice365(NotifyBase):
self.logger.warning(msg)
raise TypeError(msg)
if not is_email(email):
result = is_email(email)
if not result:
msg = 'An invalid Office 365 Email Account ID' \
'({}) was specified.'.format(email)
self.logger.warning(msg)
raise TypeError(msg)
self.email = email
# Otherwise store our the email address
self.email = result['full_email']
# Client Key (associated with generated OAuth2 Login)
self.client_id = validate_regex(
@ -200,23 +211,68 @@ class NotifyOffice365(NotifyBase):
self.logger.warning(msg)
raise TypeError(msg)
# For tracking our email -> name lookups
self.names = {}
# Acquire Carbon Copies
self.cc = set()
# Acquire Blind Carbon Copies
self.bcc = set()
# Parse our targets
self.targets = list()
targets = parse_list(targets)
if targets:
for target in targets:
# Validate targets and drop bad ones:
if not is_email(target):
self.logger.warning(
'Dropped invalid email specified: {}'.format(target))
for recipient in parse_emails(targets):
# Validate recipients (to:) and drop bad ones:
result = is_email(recipient)
if result:
# Add our email to our target list
self.targets.append(
(result['name'] if result['name'] else False,
result['full_email']))
continue
# Add our email to our target list
self.targets.append(target)
self.logger.warning(
'Dropped invalid To email ({}) specified.'
.format(recipient))
else:
# Default to adding ourselves
self.targets.append(self.email)
# If our target email list is empty we want to add ourselves to it
self.targets.append((False, self.email))
# Validate recipients (cc:) and drop bad ones:
for recipient in parse_emails(cc):
email = is_email(recipient)
if email:
self.cc.add(email['full_email'])
# Index our name (if one exists)
self.names[email['full_email']] = \
email['name'] if email['name'] else False
continue
self.logger.warning(
'Dropped invalid Carbon Copy email '
'({}) specified.'.format(recipient),
)
# Validate recipients (bcc:) and drop bad ones:
for recipient in parse_emails(bcc):
email = is_email(recipient)
if email:
self.bcc.add(email['full_email'])
# Index our name (if one exists)
self.names[email['full_email']] = \
email['name'] if email['name'] else False
continue
self.logger.warning(
'Dropped invalid Blind Carbon Copy email '
'({}) specified.'.format(recipient),
)
# Our token is acquired upon a successful login
self.token = None
@ -237,7 +293,7 @@ class NotifyOffice365(NotifyBase):
if not self.targets:
# There is no one to email; we're done
self.logger.warning(
'There are no Office 365 recipients to notify')
'There are no Email recipients to notify')
return False
# Setup our Content Type
@ -256,8 +312,8 @@ class NotifyOffice365(NotifyBase):
'SaveToSentItems': 'false'
}
# Create a copy of the targets list
targets = list(self.targets)
# Create a copy of the email list
emails = list(self.targets)
# Define our URL to post to
url = '{graph_url}/v1.0/users/{email}/sendmail'.format(
@ -265,17 +321,7 @@ class NotifyOffice365(NotifyBase):
graph_url=self.graph_url,
)
while len(targets):
# Get our target to notify
target = targets.pop(0)
# Prepare our email
payload['Message']['ToRecipients'] = [{
'EmailAddress': {
'Address': target
}
}]
while len(emails):
# authenticate ourselves if we aren't already; but this function
# also tracks if our token we have is still valid and will
# re-authenticate ourselves if nessisary.
@ -283,9 +329,68 @@ class NotifyOffice365(NotifyBase):
# We could not authenticate ourselves; we're done
return False
# Get our email to notify
to_name, to_addr = emails.pop(0)
# Strip target out of cc list if in To or Bcc
cc = (self.cc - self.bcc - set([to_addr]))
# Strip target out of bcc list if in To
bcc = (self.bcc - set([to_addr]))
# Prepare our email
payload['Message']['ToRecipients'] = [{
'EmailAddress': {
'Address': to_addr
}
}]
if to_name:
# Apply our To Name
payload['Message']['ToRecipients'][0]['EmailAddress']['Name'] \
= to_name
self.logger.debug('Email To: {}'.format(to_addr))
if cc:
# Prepare our CC list
payload['Message']['CcRecipients'] = []
for addr in cc:
_payload = {'Address': addr}
if self.names.get(addr):
_payload['Name'] = self.names[addr]
# Store our address in our payload
payload['Message']['CcRecipients']\
.append({'EmailAddress': _payload})
self.logger.debug('Email Cc: {}'.format(', '.join(
['{}{}'.format(
'' if self.names.get(e)
else '{}: '.format(self.names[e]), e) for e in cc])))
if bcc:
# Prepare our CC list
payload['Message']['BccRecipients'] = []
for addr in bcc:
_payload = {'Address': addr}
if self.names.get(addr):
_payload['Name'] = self.names[addr]
# Store our address in our payload
payload['Message']['BccRecipients']\
.append({'EmailAddress': _payload})
self.logger.debug('Email Bcc: {}'.format(', '.join(
['{}{}'.format(
'' if self.names.get(e)
else '{}: '.format(self.names[e]), e) for e in bcc])))
# Perform upstream fetch
postokay, response = self._fetch(
url=url, payload=dumps(payload),
content_type='application/json')
# Test if we were okay
if not postokay:
has_error = True
@ -453,6 +558,20 @@ class NotifyOffice365(NotifyBase):
# Our URL parameters
params = self.url_parameters(privacy=privacy, *args, **kwargs)
if self.cc:
# Handle our Carbon Copy Addresses
params['cc'] = ','.join(
['{}{}'.format(
'' if not self.names.get(e)
else '{}:'.format(self.names[e]), e) for e in self.cc])
if self.bcc:
# Handle our Blind Carbon Copy Addresses
params['bcc'] = ','.join(
['{}{}'.format(
'' if not self.names.get(e)
else '{}:'.format(self.names[e]), e) for e in self.bcc])
return '{schema}://{tenant}:{email}/{client_id}/{secret}' \
'/{targets}/?{params}'.format(
schema=self.secure_protocol,
@ -465,7 +584,9 @@ class NotifyOffice365(NotifyBase):
self.secret, privacy, mode=PrivacyMode.Secret,
safe=''),
targets='/'.join(
[NotifyOffice365.quote(x, safe='') for x in self.targets]),
[NotifyOffice365.quote('{}{}'.format(
'' if not e[0] else '{}:'.format(e[0]), e[1]),
safe='') for e in self.targets]),
params=NotifyOffice365.urlencode(params))
@staticmethod
@ -572,4 +693,12 @@ class NotifyOffice365(NotifyBase):
results['targets'] += \
NotifyOffice365.parse_list(results['qsd']['to'])
# Handle Carbon Copy Addresses
if 'cc' in results['qsd'] and len(results['qsd']['cc']):
results['cc'] = results['qsd']['cc']
# Handle Blind Carbon Copy Addresses
if 'bcc' in results['qsd'] and len(results['qsd']['bcc']):
results['bcc'] = results['qsd']['bcc']
return results

View File

@ -28,7 +28,7 @@ import requests
from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..utils import GET_EMAIL_RE
from ..utils import is_email
from ..utils import parse_list
from ..utils import parse_bool
from ..utils import validate_regex
@ -142,10 +142,10 @@ class NotifyPopcornNotify(NotifyBase):
self.targets.append(result)
continue
result = GET_EMAIL_RE.match(target)
result = is_email(target)
if result:
# store valid email
self.targets.append(target)
self.targets.append(result['full_email'])
continue
self.logger.warning(

View File

@ -28,7 +28,7 @@ from json import dumps
from json import loads
from .NotifyBase import NotifyBase
from ..utils import GET_EMAIL_RE
from ..utils import is_email
from ..common import NotifyType
from ..utils import parse_list
from ..utils import validate_regex
@ -230,22 +230,29 @@ class NotifyPushBullet(NotifyBase):
'body': body,
}
if recipient is PUSHBULLET_SEND_TO_ALL:
# Check if an email was defined
match = is_email(recipient)
if match:
payload['email'] = match['full_email']
self.logger.debug(
"PushBullet recipient {} parsed as an email address"
.format(recipient))
elif recipient is PUSHBULLET_SEND_TO_ALL:
# Send to all
pass
elif GET_EMAIL_RE.match(recipient):
payload['email'] = recipient
self.logger.debug(
"Recipient '%s' is an email address" % recipient)
elif recipient[0] == '#':
payload['channel_tag'] = recipient[1:]
self.logger.debug("Recipient '%s' is a channel" % recipient)
self.logger.debug(
"PushBullet recipient {} parsed as a channel"
.format(recipient))
else:
payload['device_iden'] = recipient
self.logger.debug("Recipient '%s' is a device" % recipient)
self.logger.debug(
"PushBullet recipient {} parsed as a device"
.format(recipient))
okay, response = self._send(
self.notify_url.format('pushes'), payload)

View File

@ -50,7 +50,7 @@ from .NotifyBase import NotifyBase
from ..common import NotifyFormat
from ..common import NotifyType
from ..utils import parse_list
from ..utils import GET_EMAIL_RE
from ..utils import is_email
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
@ -170,18 +170,15 @@ class NotifySendGrid(NotifyBase):
self.logger.warning(msg)
raise TypeError(msg)
self.from_email = from_email
try:
result = GET_EMAIL_RE.match(self.from_email)
if not result:
# let outer exception handle this
raise TypeError
except (TypeError, AttributeError):
msg = 'Invalid ~From~ email specified: {}'.format(self.from_email)
result = is_email(from_email)
if not result:
msg = 'Invalid ~From~ email specified: {}'.format(from_email)
self.logger.warning(msg)
raise TypeError(msg)
# Store email address
self.from_email = result['full_email']
# Acquire Targets (To Emails)
self.targets = list()
@ -201,8 +198,9 @@ class NotifySendGrid(NotifyBase):
# Validate recipients (to:) and drop bad ones:
for recipient in parse_list(targets):
if GET_EMAIL_RE.match(recipient):
self.targets.append(recipient)
result = is_email(recipient)
if result:
self.targets.append(result['full_email'])
continue
self.logger.warning(
@ -213,8 +211,9 @@ class NotifySendGrid(NotifyBase):
# Validate recipients (cc:) and drop bad ones:
for recipient in parse_list(cc):
if GET_EMAIL_RE.match(recipient):
self.cc.add(recipient)
result = is_email(recipient)
if result:
self.cc.add(result['full_email'])
continue
self.logger.warning(
@ -225,8 +224,9 @@ class NotifySendGrid(NotifyBase):
# Validate recipients (bcc:) and drop bad ones:
for recipient in parse_list(bcc):
if GET_EMAIL_RE.match(recipient):
self.bcc.add(recipient)
result = is_email(recipient)
if result:
self.bcc.add(result['full_email'])
continue
self.logger.warning(

View File

@ -36,7 +36,7 @@ from ..URLBase import PrivacyMode
from ..common import NotifyFormat
from ..common import NotifyType
from ..utils import parse_list
from ..utils import GET_EMAIL_RE
from ..utils import is_email
from ..AppriseLocale import gettext_lazy as _
@ -140,12 +140,6 @@ class NotifyTwist(NotifyBase):
# <workspace_id>:<channel_id>
self.channel_ids = set()
# Initialize our Email Object
self.email = email if email else '{}@{}'.format(
self.user,
self.host,
)
# The token is None if we're not logged in and False if we
# failed to log in. Otherwise it is set to the actual token
self.token = None
@ -171,26 +165,31 @@ class NotifyTwist(NotifyBase):
# }
self._cached_channels = dict()
try:
result = GET_EMAIL_RE.match(self.email)
if not result:
# let outer exception handle this
raise TypeError
# Initialize our Email Object
self.email = email if email else '{}@{}'.format(
self.user,
self.host,
)
if email:
# Force user/host to be that of the defined email for
# consistency. This is very important for those initializing
# this object with the the email object would could potentially
# cause inconsistency to contents in the NotifyBase() object
self.user = result.group('fulluser')
self.host = result.group('domain')
except (TypeError, AttributeError):
# Check if it is valid
result = is_email(self.email)
if not result:
# let outer exception handle this
msg = 'The Twist Auth email specified ({}) is invalid.'\
.format(self.email)
self.logger.warning(msg)
raise TypeError(msg)
# Re-assign email based on what was parsed
self.email = result['full_email']
if email:
# Force user/host to be that of the defined email for
# consistency. This is very important for those initializing
# this object with the the email object would could potentially
# cause inconsistency to contents in the NotifyBase() object
self.user = result['user']
self.host = result['domain']
if not self.password:
msg = 'No Twist password was specified with account: {}'\
.format(self.email)

View File

@ -62,7 +62,7 @@ from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..utils import parse_list
from ..utils import validate_regex
from ..utils import GET_EMAIL_RE
from ..utils import is_email
from ..AppriseLocale import gettext_lazy as _
# A Valid Bot Name
@ -260,7 +260,8 @@ class NotifyZulip(NotifyBase):
targets = list(self.targets)
while len(targets):
target = targets.pop(0)
if GET_EMAIL_RE.match(target):
result = is_email(target)
if result:
# Send a private message
payload['type'] = 'private'
else:
@ -268,7 +269,7 @@ class NotifyZulip(NotifyBase):
payload['type'] = 'stream'
# Set our target
payload['to'] = target
payload['to'] = target if not result else result['full_email']
self.logger.debug('Zulip POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,

View File

@ -449,7 +449,7 @@ def url_to_dict(url):
schema = GET_SCHEMA_RE.match(_url)
if schema is None:
# Not a valid URL; take an early exit
logger.error('Unsupported URL {}'.format(url))
logger.error('Unsupported URL: {}'.format(url))
return None
# Ensure our schema is always in lower case

View File

@ -104,16 +104,24 @@ GET_SCHEMA_RE = re.compile(r'\s*(?P<schema>[a-z0-9]{2,9})://.*$', re.I)
# Regular expression based and expanded from:
# http://www.regular-expressions.info/email.html
# Extended to support colon (:) delimiter for parsing names from the URL
# such as:
# - 'Optional Name':user@example.com
# - 'Optional Name' <user@example.com>
#
# The expression also parses the general email as well such as:
# - user@example.com
# - label+user@example.com
GET_EMAIL_RE = re.compile(
r"(?P<fulluser>((?P<label>[^+]+)\+)?"
r"(?P<userid>[a-z0-9$%=_~-]+"
r"(?:\.[a-z0-9$%+=_~-]+)"
r"*))@(?P<domain>("
r"(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+"
r"[a-z0-9](?:[a-z0-9-]*[a-z0-9]))|"
r"[a-z0-9][a-z0-9-]{5,})",
re.IGNORECASE,
)
r'((?P<name>[^:<]+)?[:<\s]+)?'
r'(?P<full_email>((?P<label>[^+]+)\+)?'
r'(?P<email>(?P<userid>[a-z0-9$%=_~-]+'
r'(?:\.[a-z0-9$%+=_~-]+)'
r'*)@(?P<domain>('
r'(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+'
r'[a-z0-9](?:[a-z0-9-]*[a-z0-9]))|'
r'[a-z0-9][a-z0-9-]{5,})))'
r'\s*>?', re.IGNORECASE)
# Regular expression used to extract a phone number
GET_PHONE_NO_RE = re.compile(r'^\+?(?P<phone>[0-9\s)(+-]+)\s*$')
@ -122,6 +130,12 @@ GET_PHONE_NO_RE = re.compile(r'^\+?(?P<phone>[0-9\s)(+-]+)\s*$')
URL_DETECTION_RE = re.compile(
r'([a-z0-9]+?:\/\/.*?)(?=$|[\s,]+[a-z0-9]{2,9}?:\/\/)', re.I)
EMAIL_DETECTION_RE = re.compile(
r'[\s,]*([^@]+@.*?)(?=$|[\s,]+'
+ r'(?:[^:<]+?[:<\s]+?)?'
r'[^@\s,]+@[^\s,]+)',
re.IGNORECASE)
# validate_regex() utilizes this mapping to track and re-use pre-complied
# regular expressions
REGEX_VALIDATE_LOOKUP = {}
@ -227,11 +241,33 @@ def is_email(address):
"""
try:
return GET_EMAIL_RE.match(address) is not None
match = GET_EMAIL_RE.match(address)
except TypeError:
# invalid syntax
# not parseable content
return False
if match:
return {
# The name parsed from the URL (if one exists)
'name': '' if match.group('name') is None
else match.group('name').strip(),
# The email address
'email': match.group('email'),
# The full email address (includes label if specified)
'full_email': match.group('full_email'),
# The label (if specified) e.g: label+user@example.com
'label': '' if match.group('label') is None
else match.group('label').strip(),
# The user (which does not include the label) from the email
# parsed.
'user': match.group('userid'),
# The domain associated with the email address
'domain': match.group('domain'),
}
return False
def tidy_path(path):
"""take a filename and or directory and attempts to tidy it up by removing
@ -536,19 +572,76 @@ def parse_bool(arg, default=False):
return bool(arg)
def split_urls(urls):
def parse_emails(*args, **kwargs):
"""
Takes a string containing URLs separated by comma's and/or spaces and
returns a list.
"""
try:
results = URL_DETECTION_RE.findall(urls)
# for Python 2.7 support, store_unparsable is not in the url above
# as just parse_emails(*args, store_unparseable=True) since it is
# an invalid syntax. This is the workaround to be backards compatible:
store_unparseable = kwargs.get('store_unparseable', True)
except TypeError:
results = []
result = []
for arg in args:
if isinstance(arg, six.string_types) and arg:
_result = EMAIL_DETECTION_RE.findall(arg)
if _result:
result += _result
return results
elif not _result and store_unparseable:
# we had content passed into us that was lost because it was
# so poorly formatted that it didn't even come close to
# meeting the regular expression we defined. We intentially
# keep it as part of our result set so that parsing done
# at a higher level can at least report this to the end user
# and hopefully give them some indication as to what they
# may have done wrong.
result += \
[x for x in filter(bool, re.split(STRING_DELIMITERS, arg))]
elif isinstance(arg, (set, list, tuple)):
# Use recursion to handle the list of Emails
result += parse_emails(*arg, store_unparseable=store_unparseable)
return result
def parse_urls(*args, **kwargs):
"""
Takes a string containing URLs separated by comma's and/or spaces and
returns a list.
"""
# for Python 2.7 support, store_unparsable is not in the url above
# as just parse_urls(*args, store_unparseable=True) since it is
# an invalid syntax. This is the workaround to be backards compatible:
store_unparseable = kwargs.get('store_unparseable', True)
result = []
for arg in args:
if isinstance(arg, six.string_types) and arg:
_result = URL_DETECTION_RE.findall(arg)
if _result:
result += _result
elif not _result and store_unparseable:
# we had content passed into us that was lost because it was
# so poorly formatted that it didn't even come close to
# meeting the regular expression we defined. We intentially
# keep it as part of our result set so that parsing done
# at a higher level can at least report this to the end user
# and hopefully give them some indication as to what they
# may have done wrong.
result += \
[x for x in filter(bool, re.split(STRING_DELIMITERS, arg))]
elif isinstance(arg, (set, list, tuple)):
# Use recursion to handle the list of URLs
result += parse_urls(*arg, store_unparseable=store_unparseable)
return result
def parse_list(*args):

View File

@ -108,6 +108,12 @@ def test_apprise():
'3ccdd113474722377935511fc85d3dd4') is True
assert len(a) == 3
# Try adding nothing but delimiters
assert a.add(',, ,, , , , ,') is False
# The number of servers added doesn't change
assert len(a) == 3
# We can pop an object off of our stack by it's indexed value:
obj = a.pop(0)
assert isinstance(obj, NotifyBase) is True

View File

@ -211,6 +211,17 @@ TEST_URLS = (
'instance': plugins.NotifyEmail,
'privacy_url': 'mailto://localhost',
}),
# Test multi-emails where some are bad
('mailto://user:pass@localhost/test@example.com/test2@/$@!/', {
'instance': plugins.NotifyEmail,
'privacy_url': 'mailto://user:****@localhost/'
}),
('mailto://user:pass@localhost/?bcc=test2@,$@!/', {
'instance': plugins.NotifyEmail,
}),
('mailto://user:pass@localhost/?cc=test2@,$@!/', {
'instance': plugins.NotifyEmail,
}),
)
@ -402,7 +413,7 @@ def test_webbase_lookup(mock_smtp, mock_smtpssl):
assert isinstance(obj, plugins.NotifyEmail)
assert len(obj.targets) == 1
assert 'user@l2g.com' in obj.targets
assert (False, 'user@l2g.com') in obj.targets
assert obj.from_addr == 'user@l2g.com'
assert obj.password == 'pass'
assert obj.user == 'user'
@ -655,8 +666,8 @@ def test_email_url_variations():
assert obj.password == 'abcd123'
assert obj.user == 'apprise@example21.ca'
assert len(obj.targets) == 1
assert 'apprise@example.com' in obj.targets
assert obj.targets[0] == obj.from_addr
assert (False, 'apprise@example.com') in obj.targets
assert obj.targets[0][1] == obj.from_addr
# test user and password specified in the url body (as an argument)
# this always over-rides the entries at the front of the url
@ -672,8 +683,8 @@ def test_email_url_variations():
assert obj.password == 'abcd123'
assert obj.user == 'apprise@example21.ca'
assert len(obj.targets) == 1
assert 'apprise@example.com' in obj.targets
assert obj.targets[0] == obj.from_addr
assert (False, 'apprise@example.com') in obj.targets
assert obj.targets[0][1] == obj.from_addr
assert obj.smtp_host == 'example.com'
# test a complicated example
@ -696,7 +707,7 @@ def test_email_url_variations():
assert obj.port == 1234
assert obj.smtp_host == 'smtp.example.edu'
assert len(obj.targets) == 1
assert 'to@example.jp' in obj.targets
assert (False, 'to@example.jp') in obj.targets
assert obj.from_addr == 'from@example.jp'

View File

@ -23,6 +23,7 @@
# THE SOFTWARE.
import os
import six
import mock
import pytest
import requests
@ -76,6 +77,30 @@ def test_office365_general(mock_post):
assert isinstance(obj, plugins.NotifyOffice365)
# Test our URL generation
assert isinstance(obj.url(), six.string_types)
# Test our notification
assert obj.notify(title='title', body='test') is True
# Instantiate our object
obj = Apprise.instantiate(
'o365://{tenant}:{email}/{tenant}/{secret}/{targets}'
'?bcc={bcc}&cc={cc}'.format(
tenant=tenant,
email=email,
secret=secret,
targets=targets,
# Test the cc and bcc list (use good and bad email)
cc='Chuck Norris cnorris@yahoo.ca, Sauron@lotr.me, invalid@!',
bcc='Bruce Willis bwillis@hotmail.com, Frodo@lotr.me invalid@!',
))
assert isinstance(obj, plugins.NotifyOffice365)
# Test our URL generation
assert isinstance(obj.url(), six.string_types)
# Test our notification
assert obj.notify(title='title', body='test') is True
@ -110,22 +135,27 @@ def test_office365_general(mock_post):
)
# One of the targets are invalid
plugins.NotifyOffice365(
obj = plugins.NotifyOffice365(
email=email,
client_id=client_id,
tenant=tenant,
secret=secret,
targets=('abc@gmail.com', 'garbage'),
targets=('Management abc@gmail.com', 'garbage'),
)
# Test our notification (this will work and only notify abc@gmail.com)
assert obj.notify(title='title', body='test') is True
# all of the targets are invalid
assert plugins.NotifyOffice365(
obj = plugins.NotifyOffice365(
email=email,
client_id=client_id,
tenant=tenant,
secret=secret,
targets=('invalid', 'garbage'),
).notify(body="test") is False
)
# Test our notification (which will fail because of no entries)
assert obj.notify(title='title', body='test') is False
@mock.patch('requests.post')

View File

@ -570,57 +570,254 @@ def test_is_email():
"""
# Valid Emails
assert utils.is_email('test@gmail.com') is True
assert utils.is_email('tag+test@gmail.com') is True
results = utils.is_email('test@gmail.com')
assert '' == results['name']
assert 'test@gmail.com' == results['email']
assert 'test@gmail.com' == results['full_email']
assert 'gmail.com' == results['domain']
assert 'test' == results['user']
assert '' == results['label']
results = utils.is_email('tag+test@gmail.com')
assert '' == results['name']
assert 'test@gmail.com' == results['email']
assert 'tag+test@gmail.com' == results['full_email']
assert 'gmail.com' == results['domain']
assert 'test' == results['user']
assert 'tag' == results['label']
# Support Full Names as well
results = utils.is_email('Bill Gates: bgates@microsoft.com')
assert 'Bill Gates' == results['name']
assert 'bgates@microsoft.com' == results['email']
assert 'bgates@microsoft.com' == results['full_email']
assert 'microsoft.com' == results['domain']
assert 'bgates' == results['user']
assert '' == results['label']
results = utils.is_email('Bill Gates <bgates@microsoft.com>')
assert 'Bill Gates' == results['name']
assert 'bgates@microsoft.com' == results['email']
assert 'bgates@microsoft.com' == results['full_email']
assert 'microsoft.com' == results['domain']
assert 'bgates' == results['user']
assert '' == results['label']
results = utils.is_email('Bill Gates: <bgates@microsoft.com>')
assert 'Bill Gates' == results['name']
assert 'bgates@microsoft.com' == results['email']
assert 'bgates@microsoft.com' == results['full_email']
assert 'microsoft.com' == results['domain']
assert 'bgates' == results['user']
assert '' == results['label']
results = utils.is_email('Sundar Pichai <ceo+spichai@gmail.com>')
assert 'Sundar Pichai' == results['name']
assert 'spichai@gmail.com' == results['email']
assert 'ceo+spichai@gmail.com' == results['full_email']
assert 'gmail.com' == results['domain']
assert 'spichai' == results['user']
assert 'ceo' == results['label']
# An email without name, but contains delimiters
results = utils.is_email(' <spichai@gmail.com>')
assert '' == results['name']
assert 'spichai@gmail.com' == results['email']
assert 'spichai@gmail.com' == results['full_email']
assert 'gmail.com' == results['domain']
assert 'spichai' == results['user']
assert '' == results['label']
# a valid email not properly delimited with a colon or angle bracket
# We do a best guess and still parse it correctly
results = utils.is_email("Name valid@example.com")
assert 'Name' == results['name']
assert 'valid@example.com' == results['email']
assert 'valid@example.com' == results['full_email']
assert 'example.com' == results['domain']
assert 'valid' == results['user']
assert '' == results['label']
# a valid email not properly delimited with a colon or angle bracket
# We do a best guess and still parse it correctly
results = utils.is_email("Руслан Эра russian+russia@example.ru")
assert 'Руслан Эра' == results['name']
assert 'russia@example.ru' == results['email']
assert 'russian+russia@example.ru' == results['full_email']
assert 'example.ru' == results['domain']
assert 'russia' == results['user']
assert 'russian' == results['label']
# Invalid Emails
assert utils.is_email('invalid.com') is False
assert utils.is_email(object()) is False
assert utils.is_email(None) is False
assert utils.is_email("Just A Name") is False
assert utils.is_email("Name <bademail>") is False
def test_split_urls():
"""utils: split_urls() testing """
def test_parse_emails():
"""utils: parse_emails() testing """
# A simple single array entry (As str)
results = utils.split_urls('')
results = utils.parse_emails('')
assert isinstance(results, list)
assert len(results) == 0
# just delimeters
results = utils.split_urls(', ,, , ,,, ')
results = utils.parse_emails(', ,, , ,,, ')
assert isinstance(results, list)
assert len(results) == 0
results = utils.split_urls(',')
results = utils.parse_emails(',')
assert isinstance(results, list)
assert len(results) == 0
results = utils.split_urls(None)
results = utils.parse_emails(None)
assert isinstance(results, list)
assert len(results) == 0
results = utils.split_urls(42)
results = utils.parse_emails(42)
assert isinstance(results, list)
assert len(results) == 0
results = utils.split_urls('this is not a parseable url at all')
results = utils.parse_emails('this is not a parseable email at all')
assert isinstance(results, list)
assert len(results) == 8
# Now we do it again with the store_unparsable flag set to False
results = utils.parse_emails(
'this is not a parseable email at all', store_unparseable=False)
assert isinstance(results, list)
assert len(results) == 0
# Now test valid URLs
results = utils.split_urls('windows://')
results = utils.parse_emails('user@example.com')
assert isinstance(results, list)
assert len(results) == 1
assert 'user@example.com' in results
results = utils.parse_emails('a@')
assert isinstance(results, list)
assert len(results) == 1
assert 'a@' in results
results = utils.parse_emails('user1@example.com user2@example.com')
assert isinstance(results, list)
assert len(results) == 2
assert 'user1@example.com' in results
assert 'user2@example.com' in results
# Commas and spaces found inside URLs are ignored
emails = [
'user1@example.com,',
'test1@example.com,,, abcd@example.com',
'Chuck Norris roundhouse@kick.com',
'David Spade dspade@example.com, Yours Truly yours@truly.com',
]
results = utils.parse_emails(', '.join(emails))
assert isinstance(results, list)
assert len(results) == 6
assert 'user1@example.com' in results
assert 'test1@example.com' in results
assert 'abcd@example.com' in results
assert 'Chuck Norris roundhouse@kick.com' in results
assert 'David Spade dspade@example.com' in results
assert 'Yours Truly yours@truly.com' in results
# Test triangle bracket parsing
# Commas and spaces found inside URLs are ignored
emails = [
'User1 user1@example.com',
'User 2 user2@example.com',
'User Three <user3@example.com>',
'The Forth User: <user4@example.com>',
'5th User: user4@example.com',
]
results = utils.parse_emails(', '.join(emails))
assert isinstance(results, list)
assert len(results) == len(emails)
for email in emails:
assert email in results
# pass the entries in as a list
results = utils.parse_emails(emails)
assert isinstance(results, list)
assert len(results) == len(emails)
for email in emails:
assert email in results
# Pass in some unparseables
results = utils.parse_emails('garbage')
assert isinstance(results, list)
assert len(results) == 1
results = utils.parse_emails('garbage', store_unparseable=False)
assert isinstance(results, list)
assert len(results) == 0
# Pass in garbage
results = utils.parse_emails(object)
assert isinstance(results, list)
assert len(results) == 0
results = utils.parse_emails(42)
assert isinstance(results, list)
assert len(results) == 0
results = utils.parse_emails([None, object, 42])
assert isinstance(results, list)
assert len(results) == 0
def test_parse_urls():
"""utils: parse_urls() testing """
# A simple single array entry (As str)
results = utils.parse_urls('')
assert isinstance(results, list)
assert len(results) == 0
# just delimeters
results = utils.parse_urls(', ,, , ,,, ')
assert isinstance(results, list)
assert len(results) == 0
results = utils.parse_urls(',')
assert isinstance(results, list)
assert len(results) == 0
results = utils.parse_urls(None)
assert isinstance(results, list)
assert len(results) == 0
results = utils.parse_urls(42)
assert isinstance(results, list)
assert len(results) == 0
results = utils.parse_urls('this is not a parseable url at all')
assert isinstance(results, list)
# we still end up returning this
assert len(results) == 8
results = utils.parse_urls(
'this is not a parseable url at all', store_unparseable=False)
assert isinstance(results, list)
assert len(results) == 0
# Now test valid URLs
results = utils.parse_urls('windows://')
assert isinstance(results, list)
assert len(results) == 1
assert 'windows://' in results
results = utils.split_urls('windows:// gnome://')
results = utils.parse_urls('windows:// gnome://')
assert isinstance(results, list)
assert len(results) == 2
assert 'windows://' in results
assert 'gnome://' in results
# We don't want to parse out URLs that are part of another URL's arguments
results = utils.split_urls('discord://host?url=https://localhost')
results = utils.parse_urls('discord://host?url=https://localhost')
assert isinstance(results, list)
assert len(results) == 1
assert 'discord://host?url=https://localhost' in results
@ -644,7 +841,7 @@ def test_split_urls():
# comma exists as part of the URL and is therefore lost if it was found
# at the end of it.
results = utils.split_urls(', '.join(urls))
results = utils.parse_urls(', '.join(urls))
assert isinstance(results, list)
assert len(results) == len(urls)
for url in urls:
@ -656,7 +853,7 @@ def test_split_urls():
# The comma at the end of the password will not be lost if we're
# dealing with a single entry:
url = 'http://hostname?password=,abcd,'
results = utils.split_urls(url)
results = utils.parse_urls(url)
assert isinstance(results, list)
assert len(results) == 1
assert url in results
@ -667,7 +864,7 @@ def test_split_urls():
'schema1://hostname?password=,abcd,',
'schema2://hostname?password=,abcd,',
]
results = utils.split_urls(', '.join(urls))
results = utils.parse_urls(', '.join(urls))
assert isinstance(results, list)
assert len(results) == len(urls)
@ -680,6 +877,24 @@ def test_split_urls():
# schema2://hostname?password=,abcd,
assert urls[1] in results
# Pass the list in (as a list); results are the same
results = utils.parse_urls(urls)
assert isinstance(results, list)
assert len(results) == len(urls)
# Pass in garbage
results = utils.parse_urls(object)
assert isinstance(results, list)
assert len(results) == 0
results = utils.parse_urls(42)
assert isinstance(results, list)
assert len(results) == 0
results = utils.parse_urls([None, object, 42])
assert isinstance(results, list)
assert len(results) == 0
def test_parse_list():
"""utils: parse_list() testing """