Merge pull request #81 from caronc/80-matrix-refactored

Complete refactoring of entire Matrix plugin; refs #80
This commit is contained in:
Chris Caron 2019-03-10 15:35:04 -04:00 committed by GitHub
commit b5b95543a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 1145 additions and 186 deletions

View File

@ -42,7 +42,7 @@ The table below identifies the services this tool supports and some example serv
| [IFTTT](https://github.com/caronc/apprise/wiki/Notify_ifttt) | ifttt:// | (TCP) 443 | ifttt://webhooksID/Event<br />ifttt://webhooksID/Event1/Event2/EventN<br/>ifttt://webhooksID/Event1/?+Key=Value<br/>ifttt://webhooksID/Event1/?-Key=value1
| [Join](https://github.com/caronc/apprise/wiki/Notify_join) | join:// | (TCP) 443 | join://apikey/device<br />join://apikey/device1/device2/deviceN/<br />join://apikey/group<br />join://apikey/groupA/groupB/groupN<br />join://apikey/DeviceA/groupA/groupN/DeviceN/
| [KODI](https://github.com/caronc/apprise/wiki/Notify_kodi) | kodi:// or kodis:// | (TCP) 8080 or 443 | kodi://hostname<br />kodi://user@hostname<br />kodi://user:password@hostname:port
| [Matrix](https://github.com/caronc/apprise/wiki/Notify_matrix) | matrix:// or matrixs:// | (TCP) 80 or 443 | matrix://token<br />matrix://user@token<br />matrixs://token?mode=slack<br />matrixs://user@token
| [Matrix](https://github.com/caronc/apprise/wiki/Notify_matrix) | matrix:// or matrixs:// | (TCP) 8008 or 8448 | matrix://hostname<br />matrix://user@hostname<br />matrixs://user:pass@hostname:port/#room_alias<br />matrixs://user:pass@hostname:port/!room_id<br />matrixs://user:pass@hostname:port/#room_alias/!room_id/#room2
| [Mattermost](https://github.com/caronc/apprise/wiki/Notify_mattermost) | mmost:// | (TCP) 8065 | mmost://hostname/authkey<br />mmost://hostname:80/authkey<br />mmost://user@hostname:80/authkey<br />mmost://hostname/authkey?channel=channel<br />mmosts://hostname/authkey<br />mmosts://user@hostname/authkey<br />
| [Prowl](https://github.com/caronc/apprise/wiki/Notify_prowl) | prowl:// | (TCP) 443 | prowl://apikey<br />prowl://apikey/providerkey
| [PushBullet](https://github.com/caronc/apprise/wiki/Notify_pushbullet) | pbul:// | (TCP) 443 | pbul://accesstoken<br />pbul://accesstoken/#channel<br/>pbul://accesstoken/A_DEVICE_ID<br />pbul://accesstoken/email@address.com<br />pbul://accesstoken/#channel/#channel2/email@address.net/DEVICE

View File

@ -128,9 +128,13 @@ class URLBase(object):
# is automatically set and controlled through the throttle() call.
self._last_io_datetime = None
def throttle(self, last_io=None):
def throttle(self, last_io=None, wait=None):
"""
A common throttle control
if a wait is specified, then it will force a sleep of the
specified time if it is larger then the calculated throttle
time.
"""
if last_io is not None:
@ -156,13 +160,17 @@ class URLBase(object):
elapsed = (reference - self._last_io_datetime).total_seconds()
if elapsed < self.request_rate_per_sec:
if wait is not None:
self.logger.debug('Throttling for {}s...'.format(wait))
sleep(wait)
elif elapsed < self.request_rate_per_sec:
self.logger.debug('Throttling for {}s...'.format(
self.request_rate_per_sec - elapsed))
sleep(self.request_rate_per_sec - elapsed)
# Update our timestamp before we leave
self._last_io_datetime = reference
self._last_io_datetime = datetime.now()
return
def url(self):

View File

@ -23,32 +23,43 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# Great sources
# - https://github.com/matrix-org/matrix-python-sdk
# - https://github.com/matrix-org/synapse/blob/master/docs/reverse_proxy.rst
#
import re
import six
import requests
from json import dumps
from time import time
from json import loads
from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..common import NotifyImageSize
from ..utils import parse_bool
# Token required as part of the API request
VALIDATE_TOKEN = re.compile(r'[A-Za-z0-9]{64}')
# Define default path
MATRIX_V2_API_PATH = '/_matrix/client/r0'
# Extend HTTP Error Messages
MATRIX_HTTP_ERROR_MAP = {
403: 'Unauthorized - Invalid Token.',
429: 'Rate limit imposed; wait 2s and try again',
}
# Used to break apart list of potential tags by their delimiter
# into a usable list.
LIST_DELIM = re.compile(r'[ \t\r\n,\\/]+')
class MatrixNotificationMode(object):
SLACK = "slack"
MATRIX = "matrix"
# Matrix Room Syntax
IS_ROOM_ALIAS = re.compile(
r'^\s*(#|%23)?(?P<room>[a-z0-9-]+)((:|%3A)'
r'(?P<home_server>[a-z0-9.-]+))?\s*$', re.I)
MATRIX_NOTIFICATION_MODES = (
MatrixNotificationMode.SLACK,
MatrixNotificationMode.MATRIX,
)
# Room ID MUST start with an exclamation to avoid ambiguity
IS_ROOM_ID = re.compile(
r'^\s*(!|&#33;|%21)(?P<room>[a-z0-9-]+)((:|%3A)'
r'(?P<home_server>[a-z0-9.-]+))?\s*$', re.I)
class NotifyMatrix(NotifyBase):
@ -68,176 +79,606 @@ class NotifyMatrix(NotifyBase):
# The default secure protocol
secure_protocol = 'matrixs'
# Define the default secure port to use
default_secure_port = 8448
# Define the default insecure port to use
default_insecure_port = 8008
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_matrix'
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_32
# The maximum allowable characters allowed in the body per message
body_maxlen = 1000
# Default User
matrix_default_user = 'apprise'
# Throttle a wee-bit to avoid thrashing
request_rate_per_sec = 0.5
def __init__(self, token, mode=MatrixNotificationMode.MATRIX, **kwargs):
# How many retry attempts we'll make in the event the server asks us to
# throttle back.
default_retries = 2
# The number of micro seconds to wait if we get a 429 error code and
# the server doesn't remind us how long we shoul wait for
default_wait_ms = 1000
def __init__(self, rooms=None, thumbnail=True, **kwargs):
"""
Initialize Matrix Object
"""
super(NotifyMatrix, self).__init__(**kwargs)
if self.secure:
self.schema = 'https'
# Prepare a list of rooms to connect and notify
if isinstance(rooms, six.string_types):
self.rooms = [x for x in filter(bool, LIST_DELIM.split(
rooms,
))]
elif isinstance(rooms, (set, tuple, list)):
self.rooms = rooms
else:
self.schema = 'http'
self.rooms = []
if not isinstance(self.port, int):
self.notify_url = '%s://%s/api/v1/matrix/hook' % (
self.schema, self.host)
# our home server gets populated after a login/registration
self.home_server = None
else:
self.notify_url = '%s://%s:%d/api/v1/matrix/hook' % (
self.schema, self.host, self.port)
# our user_id gets populated after a login/registration
self.user_id = None
if not VALIDATE_TOKEN.match(token.strip()):
self.logger.warning(
'The API token specified (%s) is invalid.' % token,
)
raise TypeError(
'The API token specified (%s) is invalid.' % token,
)
# This gets initialized after a login/registration
self.access_token = None
# The token associated with the webhook
self.token = token.strip()
# Place a thumbnail image inline with the message body
self.thumbnail = thumbnail
if not self.user:
self.logger.warning(
'No user was specified; using %s.' % self.matrix_default_user)
if mode not in MATRIX_NOTIFICATION_MODES:
self.logger.warning('The mode specified (%s) is invalid.' % mode)
raise TypeError('The mode specified (%s) is invalid.' % mode)
self.mode = mode
self._re_formatting_map = {
# New lines must become the string version
r'\r\*\n': '\\n',
# Escape other special characters
r'&': '&amp;',
r'<': '&lt;',
r'>': '&gt;',
}
# Iterate over above list and store content accordingly
self._re_formatting_rules = re.compile(
r'(' + '|'.join(self._re_formatting_map.keys()) + r')',
re.IGNORECASE,
)
# maintain a lookup of room alias's we already paired with their id
# to speed up future requests
self._room_cache = {}
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Matrix Notification
"""
if self.access_token is None:
# We need to register
if not self._login():
if not self._register():
return False
if len(self.rooms) == 0:
# Attempt to retrieve a list of already joined channels
self.rooms = self._joined_rooms()
if len(self.rooms) == 0:
# Nothing to notify
self.logger.warning(
'There were no Matrix rooms specified to notify.')
return False
# Create a copy of our rooms to join and message
rooms = list(self.rooms)
# Initiaize our error tracking
has_error = False
while len(rooms) > 0:
# Get our room
room = rooms.pop(0)
# Get our room_id from our response
room_id = self._room_join(room)
if not room_id:
# Notify our user about our failure
self.logger.warning(
'Could not join Matrix room {}.'.format((room)))
# Mark our failure
has_error = True
continue
# We have our data cached at this point we can freely use it
msg = '{title}{body}'.format(
title='' if not title else '{}\r\n'.format(title),
body=body)
image_url = self.image_url(notify_type)
if self.thumbnail and image_url:
# Define our payload
image_payload = {
'msgtype': 'm.image',
'url': image_url,
'body': '{}'.format(notify_type if not title else title),
}
# Build our path
path = '/rooms/{}/send/m.room.message'.format(
NotifyBase.quote(room_id))
# Post our content
postokay, response = self._fetch(path, payload=image_payload)
if not postokay:
# Mark our failure
has_error = True
continue
# Define our payload
payload = {
'msgtype': 'm.text',
'body': msg,
}
# Build our path
path = '/rooms/{}/send/m.room.message'.format(
NotifyBase.quote(room_id))
# Post our content
postokay, response = self._fetch(path, payload=payload)
if not postokay:
# Notify our user
self.logger.warning(
'Could not send notification Matrix room {}.'.format(room))
# Mark our failure
has_error = True
continue
return not has_error
def _register(self):
"""
Register with the service if possible.
"""
# Prepare our Registration Payload. This will only work if registration
# is enabled for the public
payload = {
'kind': 'user',
'auth': {'type': 'm.login.dummy'},
}
# parameters
params = {
'kind': 'user',
}
# If a user is not specified, one will be randomly generated for you.
# If you do not specify a password, you will be unable to login to the
# account if you forget the access_token.
if self.user:
payload['username'] = self.user
if self.password:
payload['password'] = self.password
# Register
postokay, response = \
self._fetch('/register', payload=payload, params=params)
if not postokay:
# Failed to register
return False
# Pull the response details
self.access_token = response.get('access_token')
self.home_server = response.get('home_server')
self.user_id = response.get('user_id')
if self.access_token is not None:
self.logger.debug(
'Registered successfully with Matrix server.')
return True
return False
def _login(self):
"""
Acquires the matrix token required for making future requests. If we
fail we return False, otherwise we return True
"""
if self.access_token:
# Login not required; silently skip-over
return True
if not (self.user and self.password):
# It's not possible to register since we need these 2 values to
# make the action possible.
self.logger.warning(
'Failed to login to Matrix server: '
'user/pass combo is missing.')
return False
# Prepare our Registration Payload
payload = {
'type': 'm.login.password',
'user': self.user,
'password': self.password,
}
# Build our URL
postokay, response = self._fetch('/login', payload=payload)
if not postokay:
# Failed to login
return False
# Pull the response details
self.access_token = response.get('access_token')
self.home_server = response.get('home_server')
self.user_id = response.get('user_id')
if not self.access_token:
return False
self.logger.debug(
'Authenticated successfully with Matrix server.')
return True
def _logout(self):
"""
Relinquishes token from remote server
"""
if not self.access_token:
# Login not required; silently skip-over
return True
# Prepare our Registration Payload
payload = {}
# Expire our token
postokay, response = self._fetch('/logout', payload=payload)
if not postokay:
# If we get here, the token was declared as having already
# been expired. The response looks like this:
# {
# u'errcode': u'M_UNKNOWN_TOKEN',
# u'error': u'Access Token unknown or expired',
# }
#
# In this case it's okay to safely return True because
# we're logged out in this case.
if response.get('errcode') != u'M_UNKNOWN_TOKEN':
return False
# else: The response object looks like this if we were successful:
# {}
# Pull the response details
self.access_token = None
self.home_server = None
self.user_id = None
# Clear our room cache
self._room_cache = {}
self.logger.debug(
'Unauthenticated successfully with Matrix server.')
return True
def _room_join(self, room):
"""
Joins a matrix room if we're not already in it. Otherwise it attempts
to create it if it doesn't exist and always returns
the room_id if it was successful, otherwise it returns None
"""
if not self.access_token:
# We can't join a room if we're not logged in
return None
if not isinstance(room, six.string_types):
# Not a supported string
return None
# Prepare our Join Payload
payload = {}
# Not in cache, next step is to check if it's a room id...
result = IS_ROOM_ID.match(room)
if result:
# We detected ourselves the home_server
home_server = result.group('home_server') \
if result.group('home_server') else self.home_server
# It was a room ID; simple mapping:
room_id = "!{}:{}".format(
result.group('room'),
home_server,
)
# Build our URL
path = '/join/{}'.format(NotifyBase.quote(room_id))
# Make our query
postokay, _ = self._fetch(path, payload=payload)
return room_id if postokay else None
# Try to see if it's an alias then...
result = IS_ROOM_ALIAS.match(room)
if not result:
# There is nothing else it could be
self.logger.warning(
'Ignoring illegally formed room {} '
'from Matrix server list.'.format(room))
return None
# If we reach here, we're dealing with a channel alias
home_server = self.home_server \
if not result.group('home_server') \
else result.group('home_server')
# tidy our room (alias) identifier
room = '#{}:{}'.format(result.group('room'), home_server)
# Check our cache for speed:
if room in self._room_cache:
# We're done as we've already joined the channel
return self._room_cache[room]['id']
# If we reach here, we need to join the channel
# Build our URL
path = '/join/{}'.format(NotifyBase.quote(room))
# Attempt to join the channel
postokay, response = self._fetch(path, payload=payload)
if postokay:
# Cache our entry for fast access later
self._room_cache[room] = {
'id': response.get('room_id'),
'home_server': home_server,
}
return self._room_cache[room]['id']
# Try to create the channel
return self._room_create(room)
def _room_create(self, room):
"""
Creates a matrix room and return it's room_id if successful
otherwise None is returned.
"""
if not self.access_token:
# We can't create a room if we're not logged in
return None
if not isinstance(room, six.string_types):
# Not a supported string
return None
# Build our room if we have to:
result = IS_ROOM_ALIAS.match(room)
if not result:
# Illegally formed room
return None
# Our home_server
home_server = result.group('home_server') \
if result.group('home_server') else self.home_server
# update our room details
room = '#{}:{}'.format(result.group('room'), home_server)
# Prepare our Create Payload
payload = {
'room_alias_name': result.group('room'),
# Set our channel name
'name': '#{} - {}'.format(result.group('room'), self.app_desc),
# hide the room by default; let the user open it up if they wish
# to others.
'visibility': 'private',
'preset': 'trusted_private_chat',
}
postokay, response = self._fetch('/createRoom', payload=payload)
if not postokay:
# Failed to create channel
# Typical responses:
# - {u'errcode': u'M_ROOM_IN_USE',
# u'error': u'Room alias already taken'}
# - {u'errcode': u'M_UNKNOWN',
# u'error': u'Internal server error'}
if (response and response.get('errcode') == 'M_ROOM_IN_USE'):
return self._room_id(room)
return None
# Cache our entry for fast access later
self._room_cache[response.get('room_alias')] = {
'id': response.get('room_id'),
'home_server': home_server,
}
return response.get('room_id')
def _joined_rooms(self):
"""
Returns a list of the current rooms the logged in user
is a part of.
"""
if not self.access_token:
# No list is possible
return list()
postokay, response = self._fetch(
'/joined_rooms', payload=None, fn=requests.get)
if not postokay:
# Failed to retrieve listings
return list()
# Return our list of rooms
return response.get('joined_rooms', list())
def _room_id(self, room):
"""Get room id from its alias.
Args:
room (str): The room alias name.
Returns:
returns the room id if it can, otherwise it returns None
"""
if not self.access_token:
# We can't get a room id if we're not logged in
return None
if not isinstance(room, six.string_types):
# Not a supported string
return None
# Build our room if we have to:
result = IS_ROOM_ALIAS.match(room)
if not result:
# Illegally formed room
return None
# Our home_server
home_server = result.group('home_server') \
if result.group('home_server') else self.home_server
# update our room details
room = '#{}:{}'.format(result.group('room'), home_server)
# Make our request
postokay, response = self._fetch(
"/directory/room/{}".format(
self.quote(room)), payload=None, fn=requests.get)
if postokay:
return response.get("room_id")
return None
def _fetch(self, path, payload=None, params=None, fn=requests.post):
"""
Wrapper to request.post() to manage it's response better and make
the send() function cleaner and easier to maintain.
This function returns True if the _post was successful and False
if it wasn't.
"""
# Define our headers
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/json',
}
# Perform Formatting
title = self._re_formatting_rules.sub( # pragma: no branch
lambda x: self._re_formatting_map[x.group()], title,
)
body = self._re_formatting_rules.sub( # pragma: no branch
lambda x: self._re_formatting_map[x.group()], body,
)
url = '%s/%s' % (
self.notify_url,
self.token,
)
if self.access_token is not None:
headers["Authorization"] = 'Bearer %s' % self.access_token
if self.mode == MatrixNotificationMode.MATRIX:
payload = self.__matrix_mode_payload(title, body, notify_type)
default_port = self.default_secure_port \
if self.secure else self.default_insecure_port
else:
payload = self.__slack_mode_payload(title, body, notify_type)
url = \
'{schema}://{hostname}:{port}{matrix_api}{path}'.format(
schema='https' if self.secure else 'http',
hostname=self.host,
port=default_port if self.port is None else self.port,
matrix_api=MATRIX_V2_API_PATH,
path=path)
# Our response object
response = {}
# Define how many attempts we'll make if we get caught in a throttle
# event
retries = self.default_retries if self.default_retries > 0 else 1
while retries > 0:
# Decrement our throttle retry count
retries -= 1
self.logger.debug('Matrix POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
))
self.logger.debug('Matrix Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,
data=dumps(payload),
params=params,
headers=headers,
verify=self.verify_certificate,
)
if r.status_code != requests.codes.ok:
response = loads(r.content)
if r.status_code == 429:
wait = self.default_wait_ms / 1000
try:
wait = response['retry_after_ms'] / 1000
except KeyError:
try:
errordata = response['error']
wait = errordata['retry_after_ms'] / 1000
except KeyError:
pass
self.logger.warning(
'Matrix server requested we throttle back {}ms; '
'retries left {}.'.format(wait, retries))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# Throttle for specified wait
self.throttle(wait=wait)
# Try again
continue
elif r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyBase.http_response_code_lookup(
r.status_code, MATRIX_HTTP_ERROR_MAP)
self.logger.warning(
'Failed to send Matrix notification: '
'Failed to handshake with Matrix server: '
'{}{}error={}.'.format(
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug('Response Details:\r\n{}'.format(r.content))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# Return; we're done
return False
return (False, response)
else:
self.logger.info('Sent Matrix notification.')
except ValueError:
# This gets thrown if we can't parse our JSON Response
self.logger.warning('Invalid response from Matrix server.')
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
return (False, {})
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending Matrix notification.'
)
'A Connection error occured while registering with Matrix'
' server.')
self.logger.debug('Socket Exception: %s' % str(e))
# Return; we're done
return False
return (False, response)
return True
return (True, response)
def __slack_mode_payload(self, title, body, notify_type):
# prepare JSON Object
payload = {
'username': self.user if self.user else self.matrix_default_user,
# Use Markdown language
'mrkdwn': True,
'attachments': [{
'title': title,
'text': body,
'color': self.color(notify_type),
'ts': time(),
'footer': self.app_id,
}],
}
# If we get here, we ran out of retries
return (False, {})
return payload
def __matrix_mode_payload(self, title, body, notify_type):
title = NotifyBase.escape_html(title)
body = NotifyBase.escape_html(body)
msg = '<h4>%s</h4>%s<br/>' % (title, body)
payload = {
'displayName':
self.user if self.user else self.matrix_default_user,
'format': 'html',
'text': msg,
}
return payload
def __del__(self):
"""
Ensure we relinquish our token
"""
self._logout()
def url(self):
"""
@ -248,25 +689,31 @@ class NotifyMatrix(NotifyBase):
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'mode': self.mode,
}
# Determine Authentication
# Determine Authentication method
auth = ''
if self.user:
if self.user and self.password:
auth = '{user}:{password}@'.format(
user=self.quote(self.user, safe=''),
password=self.quote(self.password, safe=''),
)
elif self.user:
auth = '{user}@'.format(
user=self.quote(self.user, safe=''),
)
default_port = 443 if self.secure else 80
default_port = self.default_secure_port \
if self.secure else self.default_insecure_port
return '{schema}://{auth}{host}/{token}{port}/?{args}'.format(
return '{schema}://{auth}{hostname}{port}/{rooms}?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
host=self.host,
auth=auth,
token=self.token,
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
rooms=self.quote('/'.join(self.rooms)),
args=self.urlencode(args),
)
@ -283,11 +730,13 @@ class NotifyMatrix(NotifyBase):
# We're done early as we couldn't load the results
return results
# Apply our settings now
results['token'] = NotifyBase.unquote(results['query'])
# Get our rooms
results['rooms'] = [
NotifyBase.unquote(x) for x in filter(bool, NotifyBase.split_path(
results['fullpath']))][0:]
if 'mode' in results['qsd'] and len(results['qsd']['mode']):
results['mode'] = results['qsd']\
.get('mode', MatrixNotificationMode.MATRIX).lower()
# Use Thumbnail
results['thumbnail'] = \
parse_bool(results['qsd'].get('thumbnail', False))
return results

520
test/test_matrix_plugin.py Normal file
View File

@ -0,0 +1,520 @@
# -*- coding: utf-8 -*-
#
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import six
import mock
import requests
from apprise import plugins
from apprise import AppriseAsset
# from apprise import Apprise
from json import dumps
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_notify_matrix_plugin_general(mock_post, mock_get):
"""
API: NotifyMatrix() General Tests
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
response_obj = {
'room_id': '!abc123:localhost',
'room_alias': '#abc123:localhost',
'joined_rooms': ['!abc123:localhost', '!def456:localhost'],
'access_token': 'abcd1234',
'home_server': 'localhost',
}
request = mock.Mock()
request.content = dumps(response_obj)
request.status_code = requests.codes.ok
# Prepare Mock
mock_get.return_value = request
mock_post.return_value = request
# Variation Initializations
obj = plugins.NotifyMatrix(rooms='#abcd')
assert isinstance(obj, plugins.NotifyMatrix) is True
assert isinstance(obj.url(), six.string_types) is True
# Registration successful
assert obj.send(body="test") is True
obj = plugins.NotifyMatrix(user='user', rooms='#abcd')
assert isinstance(obj, plugins.NotifyMatrix) is True
assert isinstance(obj.url(), six.string_types) is True
# Registration successful
assert obj.send(body="test") is True
obj = plugins.NotifyMatrix(password='passwd', rooms='#abcd')
assert isinstance(obj, plugins.NotifyMatrix) is True
assert isinstance(obj.url(), six.string_types) is True
# A username gets automatically generated in these cases
assert obj.send(body="test") is True
obj = plugins.NotifyMatrix(user='user', password='passwd', rooms='#abcd')
assert isinstance(obj.url(), six.string_types) is True
assert isinstance(obj, plugins.NotifyMatrix) is True
# Registration Successful
assert obj.send(body="test") is True
# Force a failed login
ro = response_obj.copy()
del ro['access_token']
request.content = dumps(ro)
request.status_code = 404
# Fails because we couldn't register because of 404 errors
assert obj.send(body="test") is False
obj = plugins.NotifyMatrix(user='test', rooms='#abcd')
assert isinstance(obj, plugins.NotifyMatrix) is True
# Fails because we still couldn't register
assert obj.send(user='test', password='passwd', body="test") is False
obj = plugins.NotifyMatrix(user='test', password='passwd', rooms='#abcd')
assert isinstance(obj, plugins.NotifyMatrix) is True
# Fails because we still couldn't register
assert obj.send(body="test") is False
obj = plugins.NotifyMatrix(password='passwd', rooms='#abcd')
# Fails because we still couldn't register
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj.send(body="test") is False
# Force a empty joined list response
ro = response_obj.copy()
ro['joined_rooms'] = []
request.content = dumps(ro)
assert obj.send(user='test', password='passwd', body="test") is False
# Fall back to original template
request.content = dumps(response_obj)
request.status_code = requests.codes.ok
# update our response object so logins now succeed
response_obj['user_id'] = '@apprise:localhost'
# Login was successful but not get a room_id
ro = response_obj.copy()
del ro['room_id']
request.content = dumps(ro)
assert obj.send(user='test', password='passwd', body="test") is False
# Fall back to original template
request.content = dumps(response_obj)
request.status_code = requests.codes.ok
obj = plugins.NotifyMatrix(rooms=None)
assert isinstance(obj, plugins.NotifyMatrix) is True
# Force a empty joined list response
ro = response_obj.copy()
ro['joined_rooms'] = []
request.content = dumps(ro)
assert obj.send(user='test', password='passwd', body="test") is False
# Fall back to original template
request.content = dumps(response_obj)
request.status_code = requests.codes.ok
# our room list is empty so we'll have retrieved the joined_list
# as our backup
assert obj.send(user='test', password='passwd', body="test") is True
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_notify_matrix_plugin_fetch(mock_post, mock_get):
"""
API: NotifyMatrix() Server Fetch/API Tests
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
response_obj = {
'room_id': '!abc123:localhost',
'room_alias': '#abc123:localhost',
'joined_rooms': ['!abc123:localhost', '!def456:localhost'],
# Login details
'access_token': 'abcd1234',
'user_id': '@apprise:localhost',
'home_server': 'localhost',
}
def fetch_failed(url, *args, **kwargs):
# Default configuration
request = mock.Mock()
request.status_code = requests.codes.ok
request.content = dumps(response_obj)
if url.find('/rooms/') > -1:
# over-ride on room query
request.status_code = 403
request.content = dumps({
u'errcode': u'M_UNKNOWN',
u'error': u'Internal server error',
})
return request
mock_get.side_effect = fetch_failed
mock_post.side_effect = fetch_failed
obj = plugins.NotifyMatrix(user='user', password='passwd', thumbnail=True)
assert isinstance(obj, plugins.NotifyMatrix) is True
# We would hve failed to send our image notification
assert obj.send(user='test', password='passwd', body="test") is False
# Do the same query with no images to fetch
asset = AppriseAsset(image_path_mask=False, image_url_mask=False)
obj = plugins.NotifyMatrix(user='user', password='passwd', asset=asset)
assert isinstance(obj, plugins.NotifyMatrix) is True
# We would hve failed to send our notification
assert obj.send(user='test', password='passwd', body="test") is False
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
response_obj = {
# Registration
'access_token': 'abcd1234',
'user_id': '@apprise:localhost',
'home_server': 'localhost',
# For room joining
'room_id': '!abc123:localhost',
}
# Default configuration
mock_get.side_effect = None
mock_post.side_effect = None
request = mock.Mock()
request.status_code = requests.codes.ok
request.content = dumps(response_obj)
mock_post.return_value = request
mock_get.return_value = request
obj = plugins.NotifyMatrix()
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj.access_token is None
assert obj._register() is True
assert obj.access_token is not None
# Cause retries
request.status_code = 429
request.content = dumps({
'retry_after_ms': 1,
})
code, response = obj._fetch('/retry/apprise/unit/test')
assert code is False
request.content = dumps({
'error': {
'retry_after_ms': 1,
}
})
code, response = obj._fetch('/retry/apprise/unit/test')
assert code is False
request.content = dumps({
'error': {}
})
code, response = obj._fetch('/retry/apprise/unit/test')
assert code is False
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_notify_matrix_plugin_auth(mock_post, mock_get):
"""
API: NotifyMatrix() Server Authentication
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
response_obj = {
# Registration
'access_token': 'abcd1234',
'user_id': '@apprise:localhost',
'home_server': 'localhost',
}
# Default configuration
request = mock.Mock()
request.status_code = requests.codes.ok
request.content = dumps(response_obj)
mock_post.return_value = request
mock_get.return_value = request
obj = plugins.NotifyMatrix()
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj.access_token is None
# logging out without an access_token is silently a success
assert obj._logout() is True
assert obj.access_token is None
assert obj._register() is True
assert obj.access_token is not None
# Logging in is silently treated as a success because we
# already had success registering
assert obj._login() is True
assert obj.access_token is not None
# However if we log out
assert obj._logout() is True
assert obj.access_token is None
# And set ourselves up for failure
request.status_code = 403
assert obj._login() is False
assert obj.access_token is None
# Reset our token
obj.access_token = None
# Adjust our response to be invalid - missing access_token in response
request.status_code = requests.codes.ok
ro = response_obj.copy()
del ro['access_token']
request.content = dumps(ro)
# Our registration will fail now
assert obj._register() is False
assert obj.access_token is None
# So will login
obj = plugins.NotifyMatrix(user='user', password='password')
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj._login() is False
assert obj.access_token is None
# Adjust our response to be invalid - invalid json response
request.content = "{"
# Our registration will fail now
assert obj._register() is False
assert obj.access_token is None
request.status_code = requests.codes.ok
request.content = dumps(response_obj)
assert obj._register() is True
assert obj.access_token is not None
# Test logoff when getting a 403 error
request.status_code = 403
assert obj._logout() is False
assert obj.access_token is not None
request.status_code = requests.codes.ok
request.content = dumps(response_obj)
assert obj._register() is True
assert obj.access_token is not None
request.status_code = 403
request.content = dumps({
u'errcode': u'M_UNKNOWN_TOKEN',
u'error': u'Access Token unknown or expired',
})
# Test logoff when getting a 403 error; but if we have the right error
# code in the response, then we return a True
assert obj._logout() is True
assert obj.access_token is None
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_notify_matrix_plugin_rooms(mock_post, mock_get):
"""
API: NotifyMatrix() Room Testing
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
response_obj = {
# Registration
'access_token': 'abcd1234',
'user_id': '@apprise:localhost',
'home_server': 'localhost',
# For joined_room response
'joined_rooms': ['!abc123:localhost', '!def456:localhost'],
# For room joining
'room_id': '!abc123:localhost',
}
# Default configuration
request = mock.Mock()
request.status_code = requests.codes.ok
request.content = dumps(response_obj)
mock_post.return_value = request
mock_get.return_value = request
obj = plugins.NotifyMatrix()
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj.access_token is None
# Can't get room listing if we're not connnected
assert obj._room_join('#abc123') is None
assert obj._register() is True
assert obj.access_token is not None
assert obj._room_join('!abc123') == response_obj['room_id']
obj._room_cache = {}
assert obj._room_join('!abc123:localhost') == response_obj['room_id']
obj._room_cache = {}
assert obj._room_join('abc123') == response_obj['room_id']
obj._room_cache = {}
assert obj._room_join('abc123:localhost') == response_obj['room_id']
obj._room_cache = {}
assert obj._room_join('#abc123:localhost') == response_obj['room_id']
obj._room_cache = {}
assert obj._room_join('%') is None
assert obj._room_join(None) is None
# 403 response; this will push for a room creation for alias based rooms
# and these will fail
request.status_code = 403
obj._room_cache = {}
assert obj._room_join('!abc123') is None
obj._room_cache = {}
assert obj._room_join('!abc123:localhost') is None
obj._room_cache = {}
assert obj._room_join('abc123') is None
obj._room_cache = {}
assert obj._room_join('abc123:localhost') is None
obj._room_cache = {}
assert obj._room_join('#abc123:localhost') is None
# Room creation
request.status_code = requests.codes.ok
obj = plugins.NotifyMatrix()
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj.access_token is None
# Can't get room listing if we're not connnected
assert obj._room_create('#abc123') is None
assert obj._register() is True
assert obj.access_token is not None
# You can't add room_id's, they must be aliases
assert obj._room_create('!abc123') is None
assert obj._room_create('!abc123:localhost') is None
obj._room_cache = {}
assert obj._room_create('abc123') == response_obj['room_id']
obj._room_cache = {}
assert obj._room_create('abc123:localhost') == response_obj['room_id']
obj._room_cache = {}
assert obj._room_create('#abc123:localhost') == response_obj['room_id']
obj._room_cache = {}
assert obj._room_create('%') is None
assert obj._room_create(None) is None
# 403 response; this will push for a room creation for alias based rooms
# and these will fail
request.status_code = 403
obj._room_cache = {}
assert obj._room_create('abc123') is None
obj._room_cache = {}
assert obj._room_create('abc123:localhost') is None
obj._room_cache = {}
assert obj._room_create('#abc123:localhost') is None
request.status_code = 403
request.content = dumps({
u'errcode': u'M_ROOM_IN_USE',
u'error': u'Room alias already taken',
})
obj._room_cache = {}
# This causes us to look up a channel ID if we get a ROOM_IN_USE response
assert obj._room_create('#abc123:localhost') is None
# Room detection
request.status_code = requests.codes.ok
request.content = dumps(response_obj)
obj = plugins.NotifyMatrix()
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj.access_token is None
# No rooms if we're not connected
response = obj._joined_rooms()
assert isinstance(response, list) is True
assert len(response) == 0
# register our account
assert obj._register() is True
assert obj.access_token is not None
response = obj._joined_rooms()
assert isinstance(response, list) is True
assert len(response) == len(response_obj['joined_rooms'])
for r in response:
assert r in response_obj['joined_rooms']
request.status_code = 403
response = obj._joined_rooms()
assert isinstance(response, list) is True
assert len(response) == 0
# Room id lookup
request.status_code = requests.codes.ok
obj = plugins.NotifyMatrix()
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj.access_token is None
# Can't get room listing if we're not connnected
assert obj._room_id('#abc123') is None
assert obj._register() is True
assert obj.access_token is not None
# You can't add room_id's, they must be aliases
assert obj._room_id('!abc123') is None
assert obj._room_id('!abc123:localhost') is None
obj._room_cache = {}
assert obj._room_id('abc123') == response_obj['room_id']
obj._room_cache = {}
assert obj._room_id('abc123:localhost') == response_obj['room_id']
obj._room_cache = {}
assert obj._room_id('#abc123:localhost') == response_obj['room_id']
obj._room_cache = {}
assert obj._room_id('%') is None
assert obj._room_id(None) is None
# If we can't look the code up, we return None
request.status_code = 403
obj._room_cache = {}
assert obj._room_id('#abc123:localhost') is None

View File

@ -150,6 +150,12 @@ def test_notify_base():
# then other
assert elapsed < 0.5
# Force a throttle time
start_time = default_timer()
nb.throttle(wait=0.5)
elapsed = default_timer() - start_time
assert elapsed > 0.5 and elapsed < 1.5
# our NotifyBase wasn't initialized with an ImageSize so this will fail
assert nb.image_url(notify_type=NotifyType.INFO) is None
assert nb.image_path(notify_type=NotifyType.INFO) is None

View File

@ -514,48 +514,24 @@ TEST_URLS = (
('matrixs://', {
'instance': None,
}),
# No token
('matrix://localhost', {
'instance': TypeError,
}),
('matrix://user@localhost', {
'instance': TypeError,
}),
('matrix://localhost/%s' % ('a' * 64), {
# treats it as a anonymous user to register
'instance': plugins.NotifyMatrix,
# response is false because we have nothing to notify
'response': False,
}),
# Name and token
('matrix://user@localhost/%s' % ('a' * 64), {
'instance': plugins.NotifyMatrix,
}),
# port and token (secure)
('matrixs://localhost:9000/%s' % ('a' * 64), {
'instance': plugins.NotifyMatrix,
}),
# Name, port, token and slack mode
('matrix://user@localhost:9000/%s?mode=slack' % ('a' * 64), {
'instance': plugins.NotifyMatrix,
}),
# Name, port, token and matrix mode
('matrix://user@localhost:9000/%s?mode=matrix' % ('a' * 64), {
'instance': plugins.NotifyMatrix,
}),
# Name, port, token and invalid mode
('matrix://user@localhost:9000/%s?mode=foo' % ('a' * 64), {
'instance': TypeError,
}),
('matrix://user@localhost:9000/%s?mode=slack' % ('a' * 64), {
('matrix://user:pass@localhost/#room1/#room2/#room3', {
'instance': plugins.NotifyMatrix,
'response': False,
'requests_response_code': requests.codes.internal_server_error,
}),
('matrix://user@localhost:9000/%s?mode=slack' % ('a' * 64), {
('matrix://user:pass@localhost/#room1/#room2/!room1', {
'instance': plugins.NotifyMatrix,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('matrix://user@localhost:9000/%s?mode=slack' % ('a' * 64), {
('matrix://user:pass@localhost:1234/#room', {
'instance': plugins.NotifyMatrix,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them