Refactored Office 365 Plugin (#1225)

This commit is contained in:
Chris Caron 2024-12-07 18:19:24 -05:00 committed by GitHub
parent e9020e6f74
commit 4d21759f60
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 1085 additions and 211 deletions

View File

@ -269,25 +269,26 @@ class AttachBase(URLBase):
cache = self.template_args['cache']['default'] \
if self.cache is None else self.cache
if self.download_path and os.path.isfile(self.download_path) \
and cache:
try:
if self.download_path and os.path.isfile(self.download_path) \
and cache:
# We have enough reason to look further into our cached content
# and verify it has not expired.
if cache is True:
# return our fixed content as is; we will always cache it
return True
# We have enough reason to look further into our cached content
# and verify it has not expired.
if cache is True:
# return our fixed content as is; we will always cache it
return True
# Verify our cache time to determine whether we will get our
# content again.
try:
age_in_sec = time.time() - os.stat(self.download_path).st_mtime
# Verify our cache time to determine whether we will get our
# content again.
age_in_sec = \
time.time() - os.stat(self.download_path).st_mtime
if age_in_sec <= cache:
return True
except (OSError, IOError):
# The file is not present
pass
except (OSError, IOError):
# The file is not present
pass
return False if not retrieve_if_missing else self.download()
@ -359,12 +360,27 @@ class AttachBase(URLBase):
def open(self, mode='rb'):
"""
return our file pointer and track it (we'll auto close later
return our file pointer and track it (we'll auto close later)
"""
pointer = open(self.path, mode=mode)
self.__pointers.add(pointer)
return pointer
def chunk(self, size=5242880):
"""
A Generator that yield chunks of a file with the specified size.
By default the chunk size is set to 5MB (5242880 bytes)
"""
with self.open() as file:
while True:
chunk = file.read(size)
if not chunk:
break
yield chunk
def __enter__(self):
"""
support with keyword
@ -431,7 +447,15 @@ class AttachBase(URLBase):
Returns the filesize of the attachment.
"""
return os.path.getsize(self.path) if self.path else 0
if not self:
return 0
try:
return os.path.getsize(self.path) if self.path else 0
except OSError:
# OSError can occur if the file is inaccessible
return 0
def __bool__(self):
"""

View File

@ -101,7 +101,11 @@ class AttachFile(AttachBase):
# Ensure any existing content set has been invalidated
self.invalidate()
if not os.path.isfile(self.dirty_path):
try:
if not os.path.isfile(self.dirty_path):
return False
except OSError:
return False
if self.max_file_size > 0 and \

View File

@ -33,38 +33,19 @@
# Information on sending an email:
# https://docs.microsoft.com/en-us/graph/api/user-sendmail\
# ?view=graph-rest-1.0&tabs=http
# Steps to get your Microsoft Client ID, Client Secret, and Tenant ID:
# 1. You should have valid Microsoft personal account. Go to Azure Portal
# 2. Go to -> Microsoft Active Directory --> App Registrations
# 3. Click new -> give any name (your choice) in Name field -> select
# personal Microsoft accounts only --> Register
# 4. Now you have your client_id & Tenant id.
# 5. To create client_secret , go to active directory ->
# Certificate & Tokens -> New client secret
# **This is auto-generated string which may have '@' and '?'
# characters in it. You should encode these to prevent
# from having any issues.**
# 6. Now need to set permission Active directory -> API permissions ->
# Add permission (search mail) , add relevant permission.
# 7. Set the redirect uri (Web) to:
# https://login.microsoftonline.com/common/oauth2/nativeclient
#
# ...and click register.
# Note: One must set up Application Permissions (not Delegated Permissions)
# - Scopes required: Mail.Send
# - For Large Attachments: Mail.ReadWrite
# - For Email Lookups: User.Read.All
#
# This needs to be inserted into the "Redirect URI" text box as simply
# checking the check box next to this link seems to be insufficient.
# This is the default redirect uri used by this library, but you can use
# any other if you want.
#
# 8. Now you're good to go
import requests
import json
from uuid import uuid4
from datetime import datetime
from datetime import timedelta
from json import loads
from json import dumps
from .base import NotifyBase
from .. import exception
from ..url import PrivacyMode
from ..common import NotifyFormat
from ..common import NotifyType
@ -72,6 +53,7 @@ from ..utils import is_email
from ..utils import parse_emails
from ..utils import validate_regex
from ..locale import gettext_lazy as _
from ..common import PersistentStoreMode
class NotifyOffice365(NotifyBase):
@ -86,7 +68,7 @@ class NotifyOffice365(NotifyBase):
service_url = 'https://office.com/'
# The default protocol
secure_protocol = 'o365'
secure_protocol = ('azure', 'o365')
# Allow 300 requests per minute.
# 60/300 = 0.2
@ -101,6 +83,20 @@ class NotifyOffice365(NotifyBase):
# Authentication URL
auth_url = 'https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token'
# Support attachments
attachment_support = True
# Our default is to no not use persistent storage beyond in-memory
# reference
storage_mode = PersistentStoreMode.AUTO
# the maximum size an attachment can be for it to be allowed to be
# uploaded inline with the current email going out (one http post)
# Anything larger than this and a second PUT request is required to
# the outlook server to post the content through reference.
# Currently (as of 2024.10.06) this was documented to be 3MB
outlook_attachment_inline_max = 3145728
# Use all the direct application permissions you have configured for your
# app. The endpoint should issue a token for the ones associated with the
# resource you want to use.
@ -113,8 +109,9 @@ class NotifyOffice365(NotifyBase):
# Define object templates
templates = (
'{schema}://{tenant}:{email}/{client_id}/{secret}',
'{schema}://{tenant}:{email}/{client_id}/{secret}/{targets}',
# Send as user (only supported method)
'{schema}://{source}/{tenant}/{client_id}/{secret}',
'{schema}://{source}/{tenant}/{client_id}/{secret}/{targets}',
)
# Define our template tokens
@ -126,8 +123,8 @@ class NotifyOffice365(NotifyBase):
'private': True,
'regex': (r'^[a-z0-9-]+$', 'i'),
},
'email': {
'name': _('Account Email'),
'source': {
'name': _('Account Email or Object ID'),
'type': 'string',
'required': True,
},
@ -176,7 +173,7 @@ class NotifyOffice365(NotifyBase):
},
})
def __init__(self, tenant, email, client_id, secret,
def __init__(self, tenant, client_id, secret, source=None,
targets=None, cc=None, bcc=None, **kwargs):
"""
Initialize Office 365 Object
@ -192,15 +189,8 @@ class NotifyOffice365(NotifyBase):
self.logger.warning(msg)
raise TypeError(msg)
result = is_email(email)
if not result:
msg = 'An invalid Office 365 Email Account ID' \
'({}) was specified.'.format(email)
self.logger.warning(msg)
raise TypeError(msg)
# Otherwise store our the email address
self.email = result['full_email']
# Store our email/ObjectID Source
self.source = source
# Client Key (associated with generated OAuth2 Login)
self.client_id = validate_regex(
@ -247,8 +237,14 @@ class NotifyOffice365(NotifyBase):
.format(recipient))
else:
# If our target email list is empty we want to add ourselves to it
self.targets.append((False, self.email))
result = is_email(self.source)
if not result:
self.logger.warning('No Target Office 365 Email Detected')
else:
# If our target email list is empty we want to add ourselves to
# it
self.targets.append((False, self.source))
# Validate recipients (cc:) and drop bad ones:
for recipient in parse_emails(cc):
@ -288,9 +284,23 @@ class NotifyOffice365(NotifyBase):
# Presume that our token has expired 'now'
self.token_expiry = datetime.now()
# Our email source; we detect this if the source is an ObjectID
# If it is unknown we set this to None
# User is the email associated with the account
self.from_email = self.store.get('from')
result = is_email(self.source)
if result:
self.from_email = result['full_email']
self.from_name = \
result['name'] or self.store.get('name')
else:
self.from_name = self.store.get('name')
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
**kwargs):
"""
Perform Office 365 Notification
"""
@ -304,31 +314,143 @@ class NotifyOffice365(NotifyBase):
'There are no Email recipients to notify')
return False
if self.from_email is None:
if not self.authenticate():
# We could not authenticate ourselves; we're done
return False
# Acquire our from_email
url = "https://graph.microsoft.com/v1.0/users/{}".format(
self.source)
postokay, response = self._fetch(url=url, method='GET')
if not postokay:
self.logger.warning(
'Could not acquire From email address; ensure '
'"User.Read.All" Application scope is set!')
else: # Acquire our from_email (if possible)
from_email = \
response.get("mail") or response.get("userPrincipalName")
result = is_email(from_email)
if not result:
self.logger.warning(
'Could not get From email from the Azure endpoint.')
# Prevent re-occuring upstream fetches for info that isn't
# there
self.from_email = False
else:
# Store our email for future reference
self.from_email = result['full_email']
self.store.set('from', result['full_email'])
self.from_name = response.get("displayName")
if self.from_name:
self.store.set('name', self.from_name)
# Setup our Content Type
content_type = \
'HTML' if self.notify_format == NotifyFormat.HTML else 'Text'
# Prepare our payload
payload = {
'Message': {
'Subject': title,
'Body': {
'ContentType': content_type,
'Content': body,
'message': {
'subject': title,
'body': {
'contentType': content_type,
'content': body,
},
},
'SaveToSentItems': 'false'
# Below takes a string (not bool) of either 'true' or 'false'
'saveToSentItems': 'true'
}
if self.from_email:
# Apply from email if it is known
payload.update({
'message': {
'from': {
"emailAddress": {
"address": self.from_email,
"name": self.from_name or self.app_id,
}
},
}
})
# Create a copy of the email list
emails = list(self.targets)
# Define our URL to post to
url = '{graph_url}/v1.0/users/{email}/sendmail'.format(
email=self.email,
url = '{graph_url}/v1.0/users/{userid}/sendMail'.format(
graph_url=self.graph_url,
userid=self.source,
)
# Prepare our Draft URL
draft_url = \
'{graph_url}/v1.0/users/{userid}/messages' \
.format(
graph_url=self.graph_url,
userid=self.source,
)
small_attachments = []
large_attachments = []
# draft emails
drafts = []
if attach and self.attachment_support:
for no, attachment in enumerate(attach, start=1):
# Perform some simple error checking
if not attachment:
# We could not access the attachment
self.logger.error(
'Could not access Office 365 attachment {}.'.format(
attachment.url(privacy=True)))
return False
if len(attachment) > self.outlook_attachment_inline_max:
# Messages larger then xMB need to be uploaded after; a
# draft email must be prepared; below is our session
large_attachments.append({
'obj': attachment,
'name': attachment.name
if attachment.name else f'file{no:03}.dat',
})
continue
try:
# Prepare our Attachment in Base64
small_attachments.append({
"@odata.type": "#microsoft.graph.fileAttachment",
# Name of the attachment (as it should appear in email)
"name": attachment.name
if attachment.name else f'file{no:03}.dat',
# MIME type of the attachment
"contentType": "attachment.mimetype",
# Base64 Content
"contentBytes": attachment.base64(),
})
except exception.AppriseException:
# We could not access the attachment
self.logger.error(
'Could not access Office 365 attachment {}.'.format(
attachment.url(privacy=True)))
return False
self.logger.debug(
'Appending Office 365 attachment {}'.format(
attachment.url(privacy=True)))
if small_attachments:
# Store Attachments
payload['message']['attachments'] = small_attachments
while len(emails):
# authenticate ourselves if we aren't already; but this function
# also tracks if our token we have is still valid and will
@ -347,63 +469,197 @@ class NotifyOffice365(NotifyBase):
bcc = (self.bcc - set([to_addr]))
# Prepare our email
payload['Message']['ToRecipients'] = [{
'EmailAddress': {
'Address': to_addr
payload['message']['toRecipients'] = [{
'emailAddress': {
'address': to_addr
}
}]
if to_name:
# Apply our To Name
payload['Message']['ToRecipients'][0]['EmailAddress']['Name'] \
payload['message']['toRecipients'][0]['emailAddress']['name'] \
= to_name
self.logger.debug('Email To: {}'.format(to_addr))
self.logger.debug('{}Email To: {}'.format(
'Draft' if large_attachments else '', to_addr))
if cc:
# Prepare our CC list
payload['Message']['CcRecipients'] = []
payload['message']['ccRecipients'] = []
for addr in cc:
_payload = {'Address': addr}
_payload = {'address': addr}
if self.names.get(addr):
_payload['Name'] = self.names[addr]
_payload['name'] = self.names[addr]
# Store our address in our payload
payload['Message']['CcRecipients']\
.append({'EmailAddress': _payload})
payload['message']['ccRecipients']\
.append({'emailAddress': _payload})
self.logger.debug('Email Cc: {}'.format(', '.join(
['{}{}'.format(
'' if self.names.get(e)
else '{}: '.format(self.names[e]), e) for e in cc])))
self.logger.debug('{}Email Cc: {}'.format(
'Draft' if large_attachments else '', ', '.join(
['{}{}'.format(
'' if self.names.get(e)
else '{}: '.format(
self.names[e]), e) for e in cc])))
if bcc:
# Prepare our CC list
payload['Message']['BccRecipients'] = []
payload['message']['bccRecipients'] = []
for addr in bcc:
_payload = {'Address': addr}
_payload = {'address': addr}
if self.names.get(addr):
_payload['Name'] = self.names[addr]
_payload['name'] = self.names[addr]
# Store our address in our payload
payload['Message']['BccRecipients']\
.append({'EmailAddress': _payload})
payload['message']['bccRecipients']\
.append({'emailAddress': _payload})
self.logger.debug('Email Bcc: {}'.format(', '.join(
['{}{}'.format(
'' if self.names.get(e)
else '{}: '.format(self.names[e]), e) for e in bcc])))
self.logger.debug('{}Email Bcc: {}'.format(
'Draft' if large_attachments else '', ', '.join(
['{}{}'.format(
'' if self.names.get(e)
else '{}: '.format(
self.names[e]), e) for e in bcc])))
# Perform upstream fetch
# Perform upstream post
postokay, response = self._fetch(
url=url, payload=dumps(payload),
content_type='application/json')
url=url if not large_attachments
else draft_url, payload=payload)
# Test if we were okay
if not postokay:
has_error = True
elif large_attachments:
# We have large attachments now to upload and associate with
# our message. We need to prepare a draft message; acquire
# the message-id associated with it and then attach the file
# via this means.
# Acquire our Draft ID to work with
message_id = response.get("id")
if not message_id:
self.logger.warning(
'Email Draft ID could not be retrieved')
has_error = True
continue
self.logger.debug('Email Draft ID: {}'.format(message_id))
# In future, the below could probably be called via async
has_attach_error = False
for attachment in large_attachments:
if not self.upload_attachment(
attachment['obj'], message_id, attachment['name']):
self.logger.warning(
'Could not prepare attachment session for %s',
attachment['name'])
has_error = True
has_attach_error = True
# Take early exit
break
if has_attach_error:
continue
# Send off our draft
attach_url = \
"https://graph.microsoft.com/v1.0/users/" \
"{}/messages/{}/send"
attach_url = attach_url.format(
self.source,
message_id,
)
# Trigger our send
postokay, response = self._fetch(url=url)
if not postokay:
self.logger.warning(
'Could not send drafted email id: {} ', message_id)
has_error = True
continue
# Memory management
del small_attachments
del large_attachments
del drafts
return not has_error
def upload_attachment(self, attachment, message_id, name=None):
"""
Uploads an attachment to a session
"""
# Perform some simple error checking
if not attachment:
# We could not access the attachment
self.logger.error(
'Could not access Office 365 attachment {}.'.format(
attachment.url(privacy=True)))
return False
# Our Session URL
url = \
'{graph_url}/v1.0/users/{userid}/message/{message_id}' \
.format(
graph_url=self.graph_url,
userid=self.source,
message_id=message_id,
) + '/attachments/createUploadSession'
file_size = len(attachment)
payload = {
"AttachmentItem": {
"attachmentType": "file",
"name": name if name else (
attachment.name
if attachment.name else '{}.dat'.format(str(uuid4()))),
# MIME type of the attachment
"contentType": attachment.mimetype,
"size": file_size,
}
}
if not self.authenticate():
# We could not authenticate ourselves; we're done
return False
# Get our Upload URL
postokay, response = self._fetch(url, payload)
if not postokay:
return False
upload_url = response.get('uploadUrl')
if not upload_url:
return False
start_byte = 0
postokay = False
response = None
for chunk in attachment.chunk():
end_byte = start_byte + len(chunk) - 1
# Define headers for this chunk
headers = {
'User-Agent': self.app_id,
'Content-Length': str(len(chunk)),
'Content-Range':
f'bytes {start_byte}-{end_byte}/{file_size}'
}
# Upload the chunk
postokay, response = self._fetch(
upload_url, chunk, headers=headers, content_type=None,
method='PUT')
if not postokay:
return False
# Return our Upload URL
return postokay
def authenticate(self):
"""
Logs into and acquires us an authentication token to work with
@ -420,12 +676,12 @@ class NotifyOffice365(NotifyBase):
# Prepare our payload
payload = {
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.secret,
'scope': '{graph_url}/{scope}'.format(
graph_url=self.graph_url,
scope=self.scope),
'grant_type': 'client_credentials',
}
# Prepare our URL
@ -453,7 +709,9 @@ class NotifyOffice365(NotifyBase):
# "correlation_id": "fb3d2015-bc17-4bb9-bb85-30c5cf1aaaa7"
# }
postokay, response = self._fetch(url=url, payload=payload)
postokay, response = self._fetch(
url=url, payload=payload,
content_type='application/x-www-form-urlencoded')
if not postokay:
return False
@ -480,18 +738,19 @@ class NotifyOffice365(NotifyBase):
# We're authenticated
return True if self.token else False
def _fetch(self, url, payload,
content_type='application/x-www-form-urlencoded'):
def _fetch(self, url, payload=None, headers=None,
content_type='application/json', method='POST'):
"""
Wrapper to request object
"""
# Prepare our headers:
headers = {
'User-Agent': self.app_id,
'Content-Type': content_type,
}
if not headers:
headers = {
'User-Agent': self.app_id,
'Content-Type': content_type,
}
if self.token:
# Are we authenticated?
@ -501,36 +760,84 @@ class NotifyOffice365(NotifyBase):
content = {}
# Some Debug Logging
self.logger.debug('Office 365 POST URL: {} (cert_verify={})'.format(
url, self.verify_certificate))
self.logger.debug('Office 365 %s URL: {} (cert_verify={})'.format(
url, self.verify_certificate), method)
self.logger.debug('Office 365 Payload: {}' .format(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
# fetch function
req = requests.post if method == 'POST' else (
requests.put if method == 'PUT' else requests.get)
try:
r = requests.post(
r = req(
url,
data=payload,
data=json.dumps(payload)
if content_type and content_type.endswith('/json')
else payload,
headers=headers,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
if r.status_code not in (
requests.codes.ok, requests.codes.accepted):
requests.codes.ok, requests.codes.created,
requests.codes.accepted):
# We had a problem
status_str = \
NotifyOffice365.http_response_code_lookup(r.status_code)
self.logger.warning(
'Failed to send Office 365 POST to {}: '
'Failed to send Office 365 %s to {}: '
'{}error={}.'.format(
url,
', ' if status_str else '',
r.status_code))
r.status_code), method)
# A Response could look like this if a Scope element was not
# found:
# {
# "error": {
# "code": "MissingClaimType",
# "message":"The token is missing the claim type \'oid\'.",
# "innerError": {
# "oAuthEventOperationId":" 7abe20-339f-4659-9381-38f52",
# "oAuthEventcV": "xsOSpAHSHVm3Tp4SNH5oIA.1.1",
# "errorUrl": "https://url",
# "requestId": "2328ea-ec9e-43a8-80f4-164c",
# "date":"2024-12-01T02:03:13"
# }}
# }
# Error 403; the below is returned if he User.Read.All
# Application scope is not set and a lookup is
# attempted.
# {
# "error": {
# "code": "Authorization_RequestDenied",
# "message":
# "Insufficient privileges to complete the operation.",
# "innerError": {
# "date": "2024-12-06T00:15:57",
# "request-id":
# "48fdb3e7-2f1a-4f45-a5a0-99b8b851278b",
# "client-request-id": "48f-2f1a-4f45-a5a0-99b8"
# }
# }
# }
# Another response type (error 415):
# {
# "error": {
# "code": "RequestBodyRead",
# "message": "A missing or empty content type header was \
# found when trying to read a message. The content \
# type header is required.",
# }
# }
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
@ -539,7 +846,7 @@ class NotifyOffice365(NotifyBase):
return (False, content)
try:
content = loads(r.content)
content = json.loads(r.content)
except (AttributeError, TypeError, ValueError):
# ValueError = r.content is Unparsable
@ -549,8 +856,8 @@ class NotifyOffice365(NotifyBase):
except requests.RequestException as e:
self.logger.warning(
'Exception received when sending Office 365 POST to {}: '.
format(url))
'Exception received when sending Office 365 %s to {}: '.
format(url), method)
self.logger.debug('Socket Exception: %s' % str(e))
# Mark our failure
@ -566,7 +873,7 @@ class NotifyOffice365(NotifyBase):
here.
"""
return (
self.secure_protocol, self.email, self.tenant, self.client_id,
self.secure_protocol[0], self.source, self.tenant, self.client_id,
self.secret,
)
@ -575,7 +882,7 @@ class NotifyOffice365(NotifyBase):
Returns the URL built dynamically based on specified arguments.
"""
# Our URL parameters
# Extend our parameters
params = self.url_parameters(privacy=privacy, *args, **kwargs)
if self.cc:
@ -592,13 +899,13 @@ class NotifyOffice365(NotifyBase):
'' if not self.names.get(e)
else '{}:'.format(self.names[e]), e) for e in self.bcc])
return '{schema}://{tenant}:{email}/{client_id}/{secret}' \
return '{schema}://{source}/{tenant}/{client_id}/{secret}' \
'/{targets}/?{params}'.format(
schema=self.secure_protocol,
schema=self.secure_protocol[0],
tenant=self.pprint(self.tenant, privacy, safe=''),
# email does not need to be escaped because it should
# already be a valid host and username at this point
email=self.email,
source=self.source,
client_id=self.pprint(self.client_id, privacy, safe=''),
secret=self.pprint(
self.secret, privacy, mode=PrivacyMode.Secret,
@ -606,7 +913,7 @@ class NotifyOffice365(NotifyBase):
targets='/'.join(
[NotifyOffice365.quote('{}{}'.format(
'' if not e[0] else '{}:'.format(e[0]), e[1]),
safe='') for e in self.targets]),
safe='@') for e in self.targets]),
params=NotifyOffice365.urlencode(params))
def __len__(self):
@ -635,16 +942,52 @@ class NotifyOffice365(NotifyBase):
# of the secret key (since it can contain slashes in it)
entries = NotifyOffice365.split_path(results['fullpath'])
try:
# Initialize our tenant
results['tenant'] = None
# Initialize our email
results['email'] = None
# From Email
if 'from' in results['qsd'] and \
len(results['qsd']['from']):
# Extract the sending account's information
results['source'] = \
NotifyOffice365.unquote(results['qsd']['from'])
# If tenant is occupied, then the user defined makes up our source
elif results['user']:
results['source'] = '{}@{}'.format(
NotifyOffice365.unquote(results['user']),
NotifyOffice365.unquote(results['host']),
)
else:
# Object ID instead of email
results['source'] = NotifyOffice365.unquote(results['host'])
# Tenant
if 'tenant' in results['qsd'] and len(results['qsd']['tenant']):
# Extract the Tenant from the argument
results['tenant'] = \
NotifyOffice365.unquote(results['qsd']['tenant'])
elif entries:
results['tenant'] = NotifyOffice365.unquote(entries.pop(0))
# OAuth2 ID
if 'oauth_id' in results['qsd'] and len(results['qsd']['oauth_id']):
# Extract the API Key from an argument
results['client_id'] = \
NotifyOffice365.unquote(results['qsd']['oauth_id'])
elif entries:
# Get our client_id is the first entry on the path
results['client_id'] = NotifyOffice365.unquote(entries.pop(0))
except IndexError:
# no problem, we may get the client_id another way through
# arguments...
pass
#
# Prepare our target listing
#
results['targets'] = list()
while entries:
# Pop the last entry
@ -662,36 +1005,6 @@ class NotifyOffice365(NotifyBase):
# We're done
break
# Initialize our tenant
results['tenant'] = None
# Assemble our secret key which is a combination of the host followed
# by all entries in the full path that follow up until the first email
results['secret'] = '/'.join(
[NotifyOffice365.unquote(x) for x in entries])
# Assemble our client id from the user@hostname
if results['password']:
results['email'] = '{}@{}'.format(
NotifyOffice365.unquote(results['password']),
NotifyOffice365.unquote(results['host']),
)
# Update our tenant
results['tenant'] = NotifyOffice365.unquote(results['user'])
else:
# No tenant specified..
results['email'] = '{}@{}'.format(
NotifyOffice365.unquote(results['user']),
NotifyOffice365.unquote(results['host']),
)
# OAuth2 ID
if 'oauth_id' in results['qsd'] and len(results['qsd']['oauth_id']):
# Extract the API Key from an argument
results['client_id'] = \
NotifyOffice365.unquote(results['qsd']['oauth_id'])
# OAuth2 Secret
if 'oauth_secret' in results['qsd'] and \
len(results['qsd']['oauth_secret']):
@ -699,19 +1012,12 @@ class NotifyOffice365(NotifyBase):
results['secret'] = \
NotifyOffice365.unquote(results['qsd']['oauth_secret'])
# Tenant
if 'from' in results['qsd'] and \
len(results['qsd']['from']):
# Extract the sending account's information
results['email'] = \
NotifyOffice365.unquote(results['qsd']['from'])
# Tenant
if 'tenant' in results['qsd'] and \
len(results['qsd']['tenant']):
# Extract the Tenant from the argument
results['tenant'] = \
NotifyOffice365.unquote(results['qsd']['tenant'])
else:
# Assemble our secret key which is a combination of the host
# followed by all entries in the full path that follow up until
# the first email
results['secret'] = '/'.join(
[NotifyOffice365.unquote(x) for x in entries])
# Support the 'to' variable so that we can support targets this way too
# The 'to' makes it easier to use yaml configuration

View File

@ -89,6 +89,31 @@ def test_file_expiry(tmpdir):
assert aa.exists()
def test_attach_mimetype():
"""
API: AttachFile MimeType()
"""
# Simple gif test
path = join(TEST_VAR_DIR, 'apprise-test.gif')
response = AppriseAttachment.instantiate(path)
assert isinstance(response, AttachFile)
assert response.path == path
assert response.name == 'apprise-test.gif'
assert response.mimetype == 'image/gif'
# Force mimetype
response._mimetype = None
response.detected_mimetype = None
assert response.mimetype == 'image/gif'
response._mimetype = None
response.detected_mimetype = None
with mock.patch('mimetypes.guess_type', side_effect=TypeError):
assert response.mimetype == 'application/octet-stream'
def test_attach_file():
"""
API: AttachFile()
@ -105,6 +130,18 @@ def test_attach_file():
# results from cache
assert response.download()
with mock.patch('os.path.isfile', side_effect=OSError):
assert response.exists() is False
with mock.patch('os.path.isfile', return_value=False):
assert response.exists() is False
# Test that our file exists
assert response.exists() is True
response.cache = True
# Leverage always-cached flag
assert response.exists() is True
# On Windows, it is `file://D%3A%5Ca%5Capprise%5Capprise%5Ctest%5Cvar%5Capprise-test.gif`. # noqa E501
# TODO: Review - is this correct?
path_in_url = urllib.parse.quote(path)
@ -213,6 +250,23 @@ def test_attach_file():
aa = AppriseAttachment(location=ContentLocation.HOSTED)
assert aa.add(path) is False
response = AppriseAttachment.instantiate(path)
assert len(response) > 0
# Get file
assert response.download()
# Test the inability to get our file size
with mock.patch('os.path.getsize', side_effect=(0, OSError)):
assert len(response) == 0
# get file again
assert response.download()
with mock.patch('os.path.isfile', return_value=True):
response.cache = True
with mock.patch('os.path.getsize', side_effect=OSError):
assert len(response) == 0
def test_attach_file_base64():
"""

View File

@ -34,6 +34,8 @@ import requests
from datetime import datetime
from json import dumps
from apprise import Apprise
from apprise import NotifyType
from apprise import AppriseAttachment
from apprise.plugins.office365 import NotifyOffice365
from helpers import AppriseURLTester
@ -57,7 +59,7 @@ apprise_url_tests = (
# invalid url
'instance': TypeError,
}),
('o365://{tenant}:{aid}/{cid}/{secret}/{targets}'.format(
('o365://{aid}/{tenant}/{cid}/{secret}/{targets}'.format(
# invalid tenant
tenant=',',
cid='ab-cd-ef-gh',
@ -65,24 +67,24 @@ apprise_url_tests = (
secret='abcd/123/3343/@jack/test',
targets='/'.join(['email1@test.ca'])), {
# We're valid and good to go
# Expected failure
'instance': TypeError,
}),
('o365://{tenant}:{aid}/{cid}/{secret}/{targets}'.format(
('o365://{aid}/{tenant}/{cid}/{secret}/{targets}'.format(
tenant='tenant',
# invalid client id
cid='ab.',
aid='user@example.com',
aid='user2@example.com',
secret='abcd/123/3343/@jack/test',
targets='/'.join(['email1@test.ca'])), {
# We're valid and good to go
# Expected failure
'instance': TypeError,
}),
('o365://{tenant}:{aid}/{cid}/{secret}/{targets}'.format(
('o365://{tenant}/{cid}/{secret}/{targets}'.format(
# email not required if mode is set to self
tenant='tenant',
cid='ab-cd-ef-gh',
aid='user@example.com',
secret='abcd/123/3343/@jack/test',
targets='/'.join(['email1@test.ca'])), {
@ -93,17 +95,105 @@ apprise_url_tests = (
'requests_response_text': {
'expires_in': 2000,
'access_token': 'abcd1234',
'mail': 'user@example.ca',
},
}),
('o365://{aid}/{tenant}/{cid}/{secret}/{targets}'.format(
tenant='tenant',
cid='ab-cd-ef-gh',
aid='user@example.edu',
secret='abcd/123/3343/@jack/test',
targets='/'.join(['email1@test.ca'])), {
# We're valid and good to go
'instance': NotifyOffice365,
# Test what happens if a batch send fails to return a messageCount
'requests_response_text': {
'expires_in': 2000,
'access_token': 'abcd1234',
# For 'From:' Lookup
'mail': 'user@example.ca',
},
# Our expected url(privacy=True) startswith() response:
'privacy_url': 'o365://t...t:user@example.com/a...h/'
'****/email1%40test.ca/'}),
'privacy_url': 'azure://user@example.edu/t...t/a...h/'
'****/email1@test.ca/'}),
('o365://{aid}/{tenant}/{cid}/{secret}/{targets}'.format(
tenant='tenant',
cid='ab-cd-ef-gh',
# Source can also be Object ID
aid='hg-fe-dc-ba',
secret='abcd/123/3343/@jack/test',
targets='/'.join(['email1@test.ca'])), {
# We're valid and good to go
'instance': NotifyOffice365,
# Test what happens if a batch send fails to return a messageCount
'requests_response_text': {
'expires_in': 2000,
'access_token': 'abcd1234',
'mail': 'user@example.ca',
"displayName": "John",
},
# Our expected url(privacy=True) startswith() response:
'privacy_url': 'azure://hg-fe-dc-ba/t...t/a...h/'
'****/email1@test.ca/'}),
# ObjectID Specified, but no targets
('o365://{aid}/{tenant}/{cid}/{secret}/'.format(
tenant='tenant',
cid='ab-cd-ef-gh',
# Source can also be Object ID
aid='hg-fe-dc-ba',
secret='abcd/123/3343/@jack/test'), {
# We're valid and good to go
'instance': NotifyOffice365,
# Test what happens if a batch send fails to return a messageCount
'requests_response_text': {
'expires_in': 2000,
'access_token': 'abcd1234',
'mail': 'user@example.ca',
},
# No emails detected
'notify_response': False,
# Our expected url(privacy=True) startswith() response:
'privacy_url': 'azure://hg-fe-dc-ba/t...t/a...h/****'}),
# ObjectID Specified, but no targets
('o365://{aid}/{tenant}/{cid}/{secret}/'.format(
tenant='tenant',
cid='ab-cd-ef-gh',
# Source can also be Object ID
aid='hg-fe-dc-ba',
secret='abcd/123/3343/@jack/test'), {
# We're valid and good to go
'instance': NotifyOffice365,
# Test what happens if a batch send fails to return a messageCount
'requests_response_text': {
'expires_in': 2000,
'access_token': 'abcd1234',
'userPrincipalName': 'user@example.ca',
},
# No emails detected
'notify_response': False,
# Our expected url(privacy=True) startswith() response:
'privacy_url': 'azure://hg-fe-dc-ba/t...t/a...h/****'}),
# test our arguments
('o365://_/?oauth_id={cid}&oauth_secret={secret}&tenant={tenant}'
'&to={targets}&from={aid}'.format(
tenant='tenant',
cid='ab-cd-ef-gh',
aid='user@example.com',
aid='user@example.ca',
secret='abcd/123/3343/@jack/test',
targets='email1@test.ca'),
{
@ -114,13 +204,14 @@ apprise_url_tests = (
'requests_response_text': {
'expires_in': 2000,
'access_token': 'abcd1234',
'mail': 'user@example.ca',
},
# Our expected url(privacy=True) startswith() response:
'privacy_url': 'o365://t...t:user@example.com/a...h/'
'****/email1%40test.ca/'}),
'privacy_url': 'azure://user@example.ca/t...t/a...h/'
'****/email1@test.ca/'}),
# Test invalid JSON (no tenant defaults to email domain)
('o365://{tenant}:{aid}/{cid}/{secret}/{targets}'.format(
('o365://{aid}/{tenant}/{cid}/{secret}/{targets}'.format(
tenant='tenant',
cid='ab-cd-ef-gh',
aid='user@example.com',
@ -135,7 +226,7 @@ apprise_url_tests = (
'notify_response': False,
}),
# No Targets specified
('o365://{tenant}:{aid}/{cid}/{secret}'.format(
('o365://{aid}/{tenant}/{cid}/{secret}'.format(
tenant='tenant',
cid='ab-cd-ef-gh',
aid='user@example.com',
@ -148,9 +239,10 @@ apprise_url_tests = (
'requests_response_text': {
'expires_in': 2000,
'access_token': 'abcd1234',
'userPrincipalName': 'user@example.ca',
},
}),
('o365://{tenant}:{aid}/{cid}/{secret}/{targets}'.format(
('o365://{aid}/{tenant}/{cid}/{secret}/{targets}'.format(
tenant='tenant',
cid='zz-zz-zz-zz',
aid='user@example.com',
@ -185,8 +277,9 @@ def test_plugin_office365_urls():
AppriseURLTester(tests=apprise_url_tests).run_all()
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_plugin_office365_general(mock_post):
def test_plugin_office365_general(mock_get, mock_post):
"""
NotifyOffice365() General Testing
@ -200,19 +293,24 @@ def test_plugin_office365_general(mock_post):
targets = 'target@example.com'
# Prepare Mock return object
authentication = {
payload = {
"token_type": "Bearer",
"expires_in": 6000,
"access_token": "abcd1234"
"access_token": "abcd1234",
# For 'From:' Lookup
"mail": "abc@example.ca",
# For our Draft Email ID:
"id": "draft-id-no",
}
response = mock.Mock()
response.content = dumps(authentication)
response.content = dumps(payload)
response.status_code = requests.codes.ok
mock_post.return_value = response
mock_get.return_value = response
# Instantiate our object
obj = Apprise.instantiate(
'o365://{tenant}:{email}/{tenant}/{secret}/{targets}'.format(
'o365://{email}/{tenant}/{secret}/{targets}'.format(
tenant=tenant,
email=email,
secret=secret,
@ -228,10 +326,11 @@ def test_plugin_office365_general(mock_post):
# Instantiate our object
obj = Apprise.instantiate(
'o365://{tenant}:{email}/{tenant}/{secret}/{targets}'
'o365://{email}/{tenant}/{client_id}/{secret}/{targets}'
'?bcc={bcc}&cc={cc}'.format(
tenant=tenant,
email=email,
client_id=client_id,
secret=secret,
targets=targets,
# Test the cc and bcc list (use good and bad email)
@ -257,26 +356,6 @@ def test_plugin_office365_general(mock_post):
targets=None,
)
with pytest.raises(TypeError):
# Invalid email
NotifyOffice365(
email=None,
client_id=client_id,
tenant=tenant,
secret=secret,
targets=None,
)
with pytest.raises(TypeError):
# Invalid email
NotifyOffice365(
email='garbage',
client_id=client_id,
tenant=tenant,
secret=secret,
targets=None,
)
# One of the targets are invalid
obj = NotifyOffice365(
email=email,
@ -301,8 +380,9 @@ def test_plugin_office365_general(mock_post):
assert obj.notify(title='title', body='test') is False
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_plugin_office365_authentication(mock_post):
def test_plugin_office365_authentication(mock_get, mock_post):
"""
NotifyOffice365() Authentication Testing
@ -333,10 +413,11 @@ def test_plugin_office365_authentication(mock_post):
response.content = dumps(authentication_okay)
response.status_code = requests.codes.ok
mock_post.return_value = response
mock_get.return_value = response
# Instantiate our object
obj = Apprise.instantiate(
'o365://{tenant}:{email}/{client_id}/{secret}/{targets}'.format(
'azure://{email}/{tenant}/{client_id}/{secret}/{targets}'.format(
client_id=client_id,
tenant=tenant,
email=email,
@ -394,3 +475,408 @@ def test_plugin_office365_authentication(mock_post):
del invalid_auth_entries['expires_in']
response.content = dumps(invalid_auth_entries)
assert obj.authenticate() is False
@mock.patch('requests.put')
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_plugin_office365_queries(mock_post, mock_get, mock_put):
"""
NotifyOffice365() General Queries
"""
# Initialize some generic (but valid) tokens
source = 'abc-1234-object-id'
tenant = 'ff-gg-hh-ii-jj'
client_id = 'aa-bb-cc-dd-ee'
secret = 'abcd/1234/abcd@ajd@/test'
targets = 'target@example.ca'
# Prepare Mock return object
payload = {
"token_type": "Bearer",
"expires_in": 6000,
"access_token": "abcd1234",
# For 'From:' Lookup (email)
"mail": "user@example.edu",
# For 'From:' Lookup (name)
"displayName": "John",
# For our Draft Email ID:
"id": "draft-id-no",
# For FIle Uploads
"uploadUrl": "https://my.url.path/"
}
okay_response = mock.Mock()
okay_response.content = dumps(payload)
okay_response.status_code = requests.codes.ok
mock_post.return_value = okay_response
mock_put.return_value = okay_response
bad_response = mock.Mock()
bad_response.content = dumps(payload)
bad_response.status_code = requests.codes.forbidden
# Assign our GET a bad response so we fail to look up the user
mock_get.return_value = bad_response
# Instantiate our object
obj = Apprise.instantiate(
'azure://{source}/{tenant}/{client_id}{secret}/{targets}'.format(
client_id=client_id,
tenant=tenant,
source=source,
secret=secret,
targets=targets))
assert isinstance(obj, NotifyOffice365)
# We can still send a notification even if we can't look up the email
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO) is True
assert mock_post.call_count == 2
assert mock_post.call_args_list[0][0][0] == \
'https://login.microsoftonline.com/{}/oauth2/v2.0/token'.format(tenant)
assert mock_post.call_args_list[1][0][0] == \
'https://graph.microsoft.com/v1.0/users/abc-1234-object-id/sendMail'
mock_post.reset_mock()
# Now test a case where we just couldn't get any email details from the
# payload returned
# Prepare Mock return object
temp_payload = {
"token_type": "Bearer",
"expires_in": 6000,
"access_token": "abcd1234",
# For our Draft Email ID:
"id": "draft-id-no",
# For FIle Uploads
"uploadUrl": "https://my.url.path/"
}
bad_response.content = dumps(temp_payload)
bad_response.status_code = requests.codes.okay
mock_get.return_value = bad_response
obj = Apprise.instantiate(
'azure://{source}/{tenant}/{client_id}{secret}/{targets}'.format(
client_id=client_id,
tenant=tenant,
source=source,
secret=secret,
targets=targets))
assert isinstance(obj, NotifyOffice365)
# We can still send a notification even if we can't look up the email
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO) is True
@mock.patch('requests.put')
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_plugin_office365_attachments(mock_post, mock_get, mock_put):
"""
NotifyOffice365() Attachments
"""
# Initialize some generic (but valid) tokens
source = 'user@example.net'
tenant = 'ff-gg-hh-ii-jj'
client_id = 'aa-bb-cc-dd-ee'
secret = 'abcd/1234/abcd@ajd@/test'
targets = 'target@example.com'
# Prepare Mock return object
payload = {
"token_type": "Bearer",
"expires_in": 6000,
"access_token": "abcd1234",
# For 'From:' Lookup
"mail": "user@example.edu",
# For our Draft Email ID:
"id": "draft-id-no",
# For FIle Uploads
"uploadUrl": "https://my.url.path/"
}
okay_response = mock.Mock()
okay_response.content = dumps(payload)
okay_response.status_code = requests.codes.ok
mock_post.return_value = okay_response
mock_get.return_value = okay_response
mock_put.return_value = okay_response
# Instantiate our object
obj = Apprise.instantiate(
'azure://{source}/{tenant}/{client_id}{secret}/{targets}'.format(
client_id=client_id,
tenant=tenant,
source=source,
secret=secret,
targets=targets))
assert isinstance(obj, NotifyOffice365)
# Test Valid Attachment
path = os.path.join(TEST_VAR_DIR, 'apprise-test.gif')
attach = AppriseAttachment(path)
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is True
assert mock_post.call_count == 2
assert mock_post.call_args_list[0][0][0] == \
'https://login.microsoftonline.com/{}/oauth2/v2.0/token'.format(tenant)
assert mock_post.call_args_list[0][1]['headers'] \
.get('Content-Type') == 'application/x-www-form-urlencoded'
assert mock_post.call_args_list[1][0][0] == \
'https://graph.microsoft.com/v1.0/users/{}/sendMail'.format(source)
assert mock_post.call_args_list[1][1]['headers'] \
.get('Content-Type') == 'application/json'
mock_post.reset_mock()
# Test Authentication Failure
obj = Apprise.instantiate(
'azure://{source}/{tenant}/{client_id}{secret}/{targets}'.format(
client_id=client_id,
tenant=tenant,
source='object-id-requiring-lookup',
secret=secret,
targets=targets))
bad_response = mock.Mock()
bad_response.content = dumps(payload)
bad_response.status_code = requests.codes.forbidden
mock_post.return_value = bad_response
assert isinstance(obj, NotifyOffice365)
# Authentication will fail
assert obj.notify(
body='auth-fail', title='title', notify_type=NotifyType.INFO) is False
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
'https://login.microsoftonline.com/ff-gg-hh-ii-jj/oauth2/v2.0/token'
mock_post.reset_mock()
#
# Test invalid attachment
#
# Instantiate our object
obj = Apprise.instantiate(
'azure://{source}/{tenant}/{client_id}{secret}/{targets}'.format(
client_id=client_id,
tenant=tenant,
source=source,
secret=secret,
targets=targets))
assert isinstance(obj, NotifyOffice365)
mock_post.return_value = okay_response
path = os.path.join(TEST_VAR_DIR, '/invalid/path/to/an/invalid/file.jpg')
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=path) is False
assert mock_post.call_count == 0
mock_post.reset_mock()
with mock.patch('base64.b64encode', side_effect=OSError()):
# We can't send the message if we fail to parse the data
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is False
assert mock_post.call_count == 0
mock_post.reset_mock()
#
# Test case where we can't authenticate
#
obj = Apprise.instantiate(
'azure://{source}/{tenant}/{client_id}{secret}/{targets}'.format(
client_id=client_id,
tenant=tenant,
source=source,
secret=secret,
targets=targets))
# Force a smaller attachment size forcing us to create an attachment
obj.outlook_attachment_inline_max = 50
assert isinstance(obj, NotifyOffice365)
path = os.path.join(TEST_VAR_DIR, 'apprise-test.gif')
attach = AppriseAttachment(path)
mock_post.return_value = bad_response
assert obj.upload_attachment(attach[0], 'id') is False
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
'https://login.microsoftonline.com/ff-gg-hh-ii-jj/oauth2/v2.0/token'
mock_post.reset_mock()
mock_post.side_effect = (okay_response, bad_response)
mock_post.return_value = None
assert obj.upload_attachment(attach[0], 'id') is False
assert mock_post.call_count == 2
assert mock_post.call_args_list[0][0][0] == \
'https://login.microsoftonline.com/ff-gg-hh-ii-jj/oauth2/v2.0/token'
assert mock_post.call_args_list[1][0][0] == \
'https://graph.microsoft.com/v1.0/users/{}/'.format(source) + \
'message/id/attachments/createUploadSession'
mock_post.reset_mock()
# Return our status
mock_post.side_effect = None
# Prepare Mock return object
payload_no_upload_url = {
"token_type": "Bearer",
"expires_in": 6000,
"access_token": "abcd1234",
# For 'From:' Lookup
"mail": "user@example.edu",
# For our Draft Email ID:
"id": "draft-id-no",
}
tmp_response = mock.Mock()
tmp_response.content = dumps(payload_no_upload_url)
tmp_response.status_code = requests.codes.ok
mock_post.return_value = tmp_response
assert obj.upload_attachment(attach[0], 'id') is False
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
'https://graph.microsoft.com/v1.0/users/{}/'.format(source) + \
'message/id/attachments/createUploadSession'
mock_post.reset_mock()
# Return our status
mock_post.side_effect = None
mock_post.return_value = okay_response
obj = Apprise.instantiate(
'azure://{source}/{tenant}/{client_id}{secret}/{targets}'.format(
client_id=client_id,
tenant=tenant,
source=source,
secret=secret,
targets=targets))
# Force a smaller attachment size forcing us to create an attachment
obj.outlook_attachment_inline_max = 50
assert isinstance(obj, NotifyOffice365)
# We now have to prepare sepparate session attachments using draft emails
assert obj.notify(
body='body', title='title-test', notify_type=NotifyType.INFO,
attach=attach) is True
# Large Attachments
assert mock_post.call_count == 4
assert mock_post.call_args_list[0][0][0] == \
'https://login.microsoftonline.com/ff-gg-hh-ii-jj/oauth2/v2.0/token'
assert mock_post.call_args_list[1][0][0] == \
'https://graph.microsoft.com/v1.0/users/{}/messages'.format(source)
assert mock_post.call_args_list[2][0][0] == \
'https://graph.microsoft.com/v1.0/users/{}/'.format(source) + \
'message/draft-id-no/attachments/createUploadSession'
assert mock_post.call_args_list[3][0][0] == \
'https://graph.microsoft.com/v1.0/users/{}/sendMail'.format(source)
mock_post.reset_mock()
#
# Handle another case where can't upload the attachment at all
#
path = os.path.join(TEST_VAR_DIR, '/invalid/path/to/an/invalid/file.jpg')
bad_attach = AppriseAttachment(path)
assert obj.upload_attachment(bad_attach[0], 'id') is False
mock_post.reset_mock()
#
# Handle test case where we can't send the draft email after everything
# has been prepared
#
mock_post.return_value = None
mock_post.side_effect = (okay_response, okay_response, bad_response)
assert obj.notify(
body='body', title='title-test', notify_type=NotifyType.INFO,
attach=attach) is False
assert mock_post.call_count == 3
assert mock_post.call_args_list[0][0][0] == \
'https://graph.microsoft.com/v1.0/users/{}/messages'.format(source)
assert mock_post.call_args_list[1][0][0] == \
'https://graph.microsoft.com/v1.0/users/{}/'.format(source) + \
'message/draft-id-no/attachments/createUploadSession'
assert mock_post.call_args_list[2][0][0] == \
'https://graph.microsoft.com/v1.0/users/{}/sendMail'.format(source)
mock_post.reset_mock()
mock_post.side_effect = None
mock_post.return_value = okay_response
#
# Handle test case where we can not upload chunks
#
mock_put.return_value = bad_response
# We now have to prepare sepparate session attachments using draft emails
assert obj.notify(
body='body', title='title-no-chunk', notify_type=NotifyType.INFO,
attach=attach) is False
assert mock_post.call_count == 2
assert mock_post.call_args_list[0][0][0] == \
'https://graph.microsoft.com/v1.0/users/{}/messages'.format(source)
assert mock_post.call_args_list[1][0][0] == \
'https://graph.microsoft.com/v1.0/users/{}/'.format(source) + \
'message/draft-id-no/attachments/createUploadSession'
mock_put.return_value = okay_response
mock_post.reset_mock()
# Prepare Mock return object
payload_missing_id = {
"token_type": "Bearer",
"expires_in": 6000,
"access_token": "abcd1234",
# For 'From:' Lookup
"mail": "user@example.edu",
# For FIle Uploads
"uploadUrl": "https://my.url.path/"
}
temp_response = mock.Mock()
temp_response.content = dumps(payload_missing_id)
temp_response.status_code = requests.codes.ok
mock_post.return_value = temp_response
# We could not acquire an attachment id, so we'll fail to send our
# notification
assert obj.notify(
body='body', title='title-test', notify_type=NotifyType.INFO,
attach=attach) is False
# Large Attachments
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
'https://graph.microsoft.com/v1.0/users/user@example.net/messages'
mock_post.reset_mock()
# Reset attachment size
obj.outlook_attachment_inline_max = 50 * 1024 * 1024
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is True
# already authenticated
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
'https://graph.microsoft.com/v1.0/users/{}/sendMail'.format(source)
mock_post.reset_mock()