refactored code to parse URL correctly

This commit is contained in:
Chris Caron 2024-10-27 16:31:25 -04:00
parent f6c7dde9c7
commit e2897fe2e4
2 changed files with 215 additions and 63 deletions

View File

@ -50,6 +50,24 @@ from ..utils import validate_regex
from ..locale import gettext_lazy as _
class Office365WebhookMode:
"""
Office 365 Webhook Mode
"""
# Send message as ourselves using the /me/ endpoint
SELF = 'self'
# Send message as ourselves using the /users/ endpoint
AS_USER = 'user'
# Define the modes in a list for validation purposes
OFFICE365_WEBHOOK_MODES = (
Office365WebhookMode.SELF,
Office365WebhookMode.AS_USER,
)
class NotifyOffice365(NotifyBase):
"""
A wrapper for Office 365 Notifications
@ -62,7 +80,7 @@ class NotifyOffice365(NotifyBase):
service_url = 'https://office.com/'
# The default protocol
secure_protocol = 'o365'
secure_protocol = ('azure', 'o365')
# Allow 300 requests per minute.
# 60/300 = 0.2
@ -103,7 +121,6 @@ class NotifyOffice365(NotifyBase):
'{schema}://{email}/{tenant}/{client_id}/{secret}',
'{schema}://{email}/{tenant}/{client_id}/{secret}/{targets}',
# Send from 'me'
'{schema}://{tenant}/{client_id}/{secret}',
'{schema}://{tenant}/{client_id}/{secret}/{targets}',
)
@ -164,15 +181,33 @@ class NotifyOffice365(NotifyBase):
'oauth_secret': {
'alias_of': 'secret',
},
'mode': {
'name': _('Webhook Mode'),
'type': 'choice:string',
'values': OFFICE365_WEBHOOK_MODES,
'default': Office365WebhookMode.SELF,
},
})
def __init__(self, tenant, email, client_id, secret,
targets=None, cc=None, bcc=None, **kwargs):
def __init__(self, tenant, client_id, secret, email=None,
mode=None, targets=None, cc=None, bcc=None, **kwargs):
"""
Initialize Office 365 Object
"""
super().__init__(**kwargs)
# Prepare our Mode
self.mode = self.template_args['mode']['default'] \
if not mode else next(
(f for f in OFFICE365_WEBHOOK_MODES
if f.startswith(
mode.lower())), None)
if mode and not self.mode:
msg = \
'The specified Webhook mode ({}) was not found '.format(mode)
self.logger.warning(msg)
raise TypeError(msg)
# Tenant identifier
self.tenant = validate_regex(
tenant, *self.template_tokens['tenant']['regex'])
@ -182,16 +217,24 @@ class NotifyOffice365(NotifyBase):
self.logger.warning(msg)
raise TypeError(msg)
result = is_email(email)
if not result:
msg = 'An invalid Office 365 Email Account ID' \
'({}) was specified.'.format(email)
self.email = None
if email is not None:
result = is_email(email)
if not result:
msg = 'An invalid Office 365 Email Account ID' \
'({}) was specified.'.format(email)
self.logger.warning(msg)
raise TypeError(msg)
# Otherwise store our the email address
self.email = result['full_email']
elif self.mode != Office365WebhookMode.SELF:
msg = 'An expected Office 365 Email was not specified ' \
'(mode={})'.format(self.mode)
self.logger.warning(msg)
raise TypeError(msg)
# Otherwise store our the email address
self.email = result['full_email']
# Client Key (associated with generated OAuth2 Login)
self.client_id = validate_regex(
client_id, *self.template_tokens['client_id']['regex'])
@ -318,7 +361,7 @@ class NotifyOffice365(NotifyBase):
# Define our URL to post to
url = '{graph_url}/v1.0/me/sendMail'.format(
graph_url=self.graph_url,
) if not self.self.email \
) if not self.email \
else '{graph_url}/v1.0/users/{userid}/sendMail'.format(
userid=self.email,
graph_url=self.graph_url,
@ -616,7 +659,7 @@ class NotifyOffice365(NotifyBase):
here.
"""
return (
self.secure_protocol, self.email, self.tenant, self.client_id,
self.secure_protocol[0], self.email, self.tenant, self.client_id,
self.secret,
)
@ -625,8 +668,13 @@ class NotifyOffice365(NotifyBase):
Returns the URL built dynamically based on specified arguments.
"""
# Our URL parameters
params = self.url_parameters(privacy=privacy, *args, **kwargs)
# Define any URL parameters
params = {
'mode': self.mode,
}
# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
if self.cc:
# Handle our Carbon Copy Addresses
@ -642,13 +690,13 @@ class NotifyOffice365(NotifyBase):
'' if not self.names.get(e)
else '{}:'.format(self.names[e]), e) for e in self.bcc])
return '{schema}://{tenant}:{email}/{client_id}/{secret}' \
return '{schema}://{email}{tenant}/{client_id}/{secret}' \
'/{targets}/?{params}'.format(
schema=self.secure_protocol,
schema=self.secure_protocol[0],
tenant=self.pprint(self.tenant, privacy, safe=''),
# email does not need to be escaped because it should
# already be a valid host and username at this point
email=self.email,
email=self.email + '/' if self.email else '',
client_id=self.pprint(self.client_id, privacy, safe=''),
secret=self.pprint(
self.secret, privacy, mode=PrivacyMode.Secret,
@ -656,7 +704,7 @@ class NotifyOffice365(NotifyBase):
targets='/'.join(
[NotifyOffice365.quote('{}{}'.format(
'' if not e[0] else '{}:'.format(e[0]), e[1]),
safe='') for e in self.targets]),
safe='@') for e in self.targets]),
params=NotifyOffice365.urlencode(params))
def __len__(self):
@ -687,6 +735,7 @@ class NotifyOffice365(NotifyBase):
# Initialize our tenant
results['tenant'] = None
# Initialize our email
results['email'] = None
@ -697,28 +746,36 @@ class NotifyOffice365(NotifyBase):
results['email'] = \
NotifyOffice365.unquote(results['qsd']['from'])
# Hostname is no longer part of `from` and possibly instead
# is the tenant id
entries.insert(0, NotifyOffice365.unquote(results['host']))
# Tenant
if 'tenant' in results['qsd'] and \
len(results['qsd']['tenant']):
# Extract the Tenant from the argument
results['tenant'] = \
NotifyOffice365.unquote(results['qsd']['tenant'])
# If tenant is occupied, then the user defined makes
# up our email
if not results['email'] and results['user']:
# If tenant is occupied, then the user defined makes up our email
elif results['user']:
results['email'] = '{}@{}'.format(
NotifyOffice365.unquote(results['user']),
NotifyOffice365.unquote(results['host']),
)
elif not results['user']:
# Only tenant id specified (emails are sent 'from me')
results['tenant'] = NotifyOffice365.unquote(results['host'])
else:
# Hostname is no longer part of `from` and possibly instead
# is the tenant id
entries.insert(0, NotifyOffice365.unquote(results['host']))
# Tenant
if 'tenant' in results['qsd'] and len(results['qsd']['tenant']):
# Extract the Tenant from the argument
results['tenant'] = \
NotifyOffice365.unquote(results['qsd']['tenant'])
elif entries:
results['tenant'] = NotifyOffice365.unquote(entries.pop(0))
# OAuth2 ID
if 'oauth_id' in results['qsd'] and len(results['qsd']['oauth_id']):
# Extract the API Key from an argument
results['client_id'] = \
NotifyOffice365.unquote(results['qsd']['oauth_id'])
elif entries:
# Get our client_id is the first entry on the path
results['client_id'] = NotifyOffice365.unquote(entries.pop(0))
#
# Prepare our target listing
@ -740,16 +797,6 @@ class NotifyOffice365(NotifyBase):
# We're done
break
# OAuth2 ID
if 'oauth_id' in results['qsd'] and len(results['qsd']['oauth_id']):
# Extract the API Key from an argument
results['client_id'] = \
NotifyOffice365.unquote(results['qsd']['oauth_id'])
elif entries:
# Get our client_id is the first entry on the path
results['client_id'] = NotifyOffice365.unquote(entries.pop(0))
# OAuth2 Secret
if 'oauth_secret' in results['qsd'] and \
len(results['qsd']['oauth_secret']):
@ -778,4 +825,8 @@ class NotifyOffice365(NotifyBase):
if 'bcc' in results['qsd'] and len(results['qsd']['bcc']):
results['bcc'] = results['qsd']['bcc']
# Handle Mode
if 'mode' in results['qsd'] and len(results['qsd']['mode']):
results['mode'] = results['qsd']['mode']
return results

View File

@ -34,6 +34,8 @@ import requests
from datetime import datetime
from json import dumps
from apprise import Apprise
from apprise import NotifyType
from apprise import AppriseAttachment
from apprise.plugins.office365 import NotifyOffice365
from helpers import AppriseURLTester
@ -57,7 +59,7 @@ apprise_url_tests = (
# invalid url
'instance': TypeError,
}),
('o365://{tenant}:{aid}/{cid}/{secret}/{targets}'.format(
('o365://{aid}/{tenant}/{cid}/{secret}/{targets}'.format(
# invalid tenant
tenant=',',
cid='ab-cd-ef-gh',
@ -65,10 +67,10 @@ apprise_url_tests = (
secret='abcd/123/3343/@jack/test',
targets='/'.join(['email1@test.ca'])), {
# We're valid and good to go
# Expected failure
'instance': TypeError,
}),
('o365://{tenant}:{aid}/{cid}/{secret}/{targets}'.format(
('o365://{aid}/{tenant}/{cid}/{secret}/{targets}'.format(
tenant='tenant',
# invalid client id
cid='ab.',
@ -76,16 +78,53 @@ apprise_url_tests = (
secret='abcd/123/3343/@jack/test',
targets='/'.join(['email1@test.ca'])), {
# We're valid and good to go
# Expected failure
'instance': TypeError,
}),
('o365://{tenant}:{aid}/{cid}/{secret}/{targets}'.format(
('o365://{aid}/{tenant}/{cid}/{secret}/{targets}?mode=invalid'.format(
# Invalid mode
tenant='tenant',
cid='ab-cd-ef-gh',
aid='user@example.com',
secret='abcd/123/3343/@jack/test',
targets='/'.join(['email1@test.ca'])), {
# Expected failure
'instance': TypeError,
}),
('o365://{tenant}/{cid}/{secret}/{targets}?mode=user'.format(
# Invalid mode when no email specified
tenant='tenant',
cid='ab-cd-ef-gh',
secret='abcd/123/3343/@jack/test',
targets='/'.join(['email1@test.ca'])), {
# Expected failure
'instance': TypeError,
}),
('o365://{tenant}/{cid}/{secret}/{targets}?mode=self'.format(
# email not required if mode is set to self
tenant='tenant',
cid='ab-cd-ef-gh',
secret='abcd/123/3343/@jack/test',
targets='/'.join(['email1@test.ca'])), {
# We're valid and good to go
'instance': NotifyOffice365,
# Test what happens if a batch send fails to return a messageCount
'requests_response_text': {
'expires_in': 2000,
'access_token': 'abcd1234',
},
}),
('o365://{aid}/{tenant}/{cid}/{secret}/{targets}'.format(
tenant='tenant',
cid='ab-cd-ef-gh',
aid='user@example.edu',
secret='abcd/123/3343/@jack/test',
targets='/'.join(['email1@test.ca'])), {
# We're valid and good to go
'instance': NotifyOffice365,
@ -96,14 +135,14 @@ apprise_url_tests = (
},
# Our expected url(privacy=True) startswith() response:
'privacy_url': 'o365://t...t:user@example.com/a...h/'
'****/email1%40test.ca/'}),
'privacy_url': 'azure://user@example.edu/t...t/a...h/'
'****/email1@test.ca/'}),
# test our arguments
('o365://_/?oauth_id={cid}&oauth_secret={secret}&tenant={tenant}'
'&to={targets}&from={aid}'.format(
tenant='tenant',
cid='ab-cd-ef-gh',
aid='user@example.com',
aid='user@example.ca',
secret='abcd/123/3343/@jack/test',
targets='email1@test.ca'),
{
@ -117,10 +156,10 @@ apprise_url_tests = (
},
# Our expected url(privacy=True) startswith() response:
'privacy_url': 'o365://t...t:user@example.com/a...h/'
'****/email1%40test.ca/'}),
'privacy_url': 'azure://user@example.ca/t...t/a...h/'
'****/email1@test.ca/'}),
# Test invalid JSON (no tenant defaults to email domain)
('o365://{tenant}:{aid}/{cid}/{secret}/{targets}'.format(
('o365://{aid}/{tenant}/{cid}/{secret}/{targets}'.format(
tenant='tenant',
cid='ab-cd-ef-gh',
aid='user@example.com',
@ -135,7 +174,7 @@ apprise_url_tests = (
'notify_response': False,
}),
# No Targets specified
('o365://{tenant}:{aid}/{cid}/{secret}'.format(
('o365://{aid}/{tenant}/{cid}/{secret}'.format(
tenant='tenant',
cid='ab-cd-ef-gh',
aid='user@example.com',
@ -150,7 +189,7 @@ apprise_url_tests = (
'access_token': 'abcd1234',
},
}),
('o365://{tenant}:{aid}/{cid}/{secret}/{targets}'.format(
('o365://{aid}/{tenant}/{cid}/{secret}/{targets}'.format(
tenant='tenant',
cid='zz-zz-zz-zz',
aid='user@example.com',
@ -212,7 +251,7 @@ def test_plugin_office365_general(mock_post):
# Instantiate our object
obj = Apprise.instantiate(
'o365://{tenant}:{email}/{tenant}/{secret}/{targets}'.format(
'o365://{email}/{tenant}/{secret}/{targets}'.format(
tenant=tenant,
email=email,
secret=secret,
@ -228,10 +267,11 @@ def test_plugin_office365_general(mock_post):
# Instantiate our object
obj = Apprise.instantiate(
'o365://{tenant}:{email}/{tenant}/{secret}/{targets}'
'o365://{email}/{tenant}/{client_id}/{secret}/{targets}'
'?bcc={bcc}&cc={cc}'.format(
tenant=tenant,
email=email,
client_id=client_id,
secret=secret,
targets=targets,
# Test the cc and bcc list (use good and bad email)
@ -260,7 +300,7 @@ def test_plugin_office365_general(mock_post):
with pytest.raises(TypeError):
# Invalid email
NotifyOffice365(
email=None,
email='invalid',
client_id=client_id,
tenant=tenant,
secret=secret,
@ -336,7 +376,7 @@ def test_plugin_office365_authentication(mock_post):
# Instantiate our object
obj = Apprise.instantiate(
'o365://{tenant}:{email}/{client_id}/{secret}/{targets}'.format(
'azure://{email}/{tenant}/{client_id}/{secret}/{targets}'.format(
client_id=client_id,
tenant=tenant,
email=email,
@ -394,3 +434,64 @@ def test_plugin_office365_authentication(mock_post):
del invalid_auth_entries['expires_in']
response.content = dumps(invalid_auth_entries)
assert obj.authenticate() is False
@mock.patch('requests.post')
def test_plugin_office365_attachments(mock_post):
"""
NotifyOffice365() Attachments
"""
# Initialize some generic (but valid) tokens
email = 'user@example.net'
tenant = 'ff-gg-hh-ii-jj'
client_id = 'aa-bb-cc-dd-ee'
secret = 'abcd/1234/abcd@ajd@/test'
targets = 'target@example.com'
# Prepare Mock return object
authentication = {
"token_type": "Bearer",
"expires_in": 6000,
"access_token": "abcd1234"
}
okay_response = mock.Mock()
okay_response.content = dumps(authentication)
okay_response.status_code = requests.codes.ok
mock_post.return_value = okay_response
# Instantiate our object
obj = Apprise.instantiate(
'azure://{email}/{tenant}/{client_id}{secret}/{targets}'.format(
client_id=client_id,
tenant=tenant,
email=email,
secret=secret,
targets=targets))
assert isinstance(obj, NotifyOffice365)
# Test Valid Attachment
path = os.path.join(TEST_VAR_DIR, 'apprise-test.gif')
attach = AppriseAttachment(path)
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is True
# Test invalid attachment
path = os.path.join(TEST_VAR_DIR, '/invalid/path/to/an/invalid/file.jpg')
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=path) is False
with mock.patch('base64.b64encode', side_effect=OSError()):
# We can't send the message if we fail to parse the data
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is False
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is True
assert mock_post.call_count == 3