mirror of
https://github.com/caronc/apprise.git
synced 2024-12-01 12:34:35 +01:00
1276 lines
42 KiB
Python
1276 lines
42 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
|
|
# All rights reserved.
|
|
#
|
|
# This code is licensed under the MIT License.
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
# of this software and associated documentation files(the "Software"), to deal
|
|
# in the Software without restriction, including without limitation the rights
|
|
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
|
|
# copies of the Software, and to permit persons to whom the Software is
|
|
# furnished to do so, subject to the following conditions :
|
|
#
|
|
# The above copyright notice and this permission notice shall be included in
|
|
# all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
|
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
# THE SOFTWARE.
|
|
|
|
from __future__ import print_function
|
|
import re
|
|
import sys
|
|
import six
|
|
import pytest
|
|
import requests
|
|
import mock
|
|
from os import chmod
|
|
from os import getuid
|
|
from os.path import dirname
|
|
|
|
from apprise import Apprise
|
|
from apprise import AppriseAsset
|
|
from apprise import NotifyBase
|
|
from apprise import NotifyType
|
|
from apprise import NotifyFormat
|
|
from apprise import NotifyImageSize
|
|
from apprise import __version__
|
|
from apprise import URLBase
|
|
from apprise import PrivacyMode
|
|
|
|
from apprise.plugins import SCHEMA_MAP
|
|
from apprise.plugins import __load_matrix
|
|
from apprise.plugins import __reset_matrix
|
|
from apprise.utils import parse_list
|
|
import inspect
|
|
|
|
# Disable logging for a cleaner testing output
|
|
import logging
|
|
logging.disable(logging.CRITICAL)
|
|
|
|
|
|
def test_apprise():
|
|
"""
|
|
API: Apprise() object
|
|
|
|
"""
|
|
# Caling load matix a second time which is an internal function causes it
|
|
# to skip over content already loaded into our matrix and thefore accesses
|
|
# other if/else parts of the code that aren't otherwise called
|
|
__load_matrix()
|
|
|
|
a = Apprise()
|
|
|
|
# no items
|
|
assert(len(a) == 0)
|
|
|
|
# Apprise object can also be directly tested with 'if' keyword
|
|
# No entries results in a False response
|
|
assert(not a)
|
|
|
|
# Create an Asset object
|
|
asset = AppriseAsset(theme='default')
|
|
|
|
# We can load the device using our asset
|
|
a = Apprise(asset=asset)
|
|
|
|
# We can load our servers up front as well
|
|
servers = [
|
|
'faast://abcdefghijklmnop-abcdefg',
|
|
'kodi://kodi.server.local',
|
|
]
|
|
|
|
a = Apprise(servers=servers)
|
|
|
|
# 2 servers loaded
|
|
assert(len(a) == 2)
|
|
|
|
# Apprise object can also be directly tested with 'if' keyword
|
|
# At least one entry results in a True response
|
|
assert(a)
|
|
|
|
# We can retrieve our URLs this way:
|
|
assert(len(a.urls()) == 2)
|
|
|
|
# We can add another server
|
|
assert(
|
|
a.add('mmosts://mattermost.server.local/'
|
|
'3ccdd113474722377935511fc85d3dd4') is True)
|
|
assert(len(a) == 3)
|
|
|
|
# We can pop an object off of our stack by it's indexed value:
|
|
obj = a.pop(0)
|
|
assert(isinstance(obj, NotifyBase) is True)
|
|
assert(len(a) == 2)
|
|
|
|
# We can retrieve elements from our list too by reference:
|
|
assert(isinstance(a[0].url(), six.string_types) is True)
|
|
|
|
# We can iterate over our list too:
|
|
count = 0
|
|
for o in a:
|
|
assert(isinstance(o.url(), six.string_types) is True)
|
|
count += 1
|
|
# verify that we did indeed iterate over each element
|
|
assert(len(a) == count)
|
|
|
|
# We can empty our set
|
|
a.clear()
|
|
assert(len(a) == 0)
|
|
|
|
# An invalid schema
|
|
assert(
|
|
a.add('this is not a parseable url at all') is False)
|
|
assert(len(a) == 0)
|
|
|
|
# An unsupported schema
|
|
assert(
|
|
a.add('invalid://we.just.do.not.support.this.plugin.type') is False)
|
|
assert(len(a) == 0)
|
|
|
|
# A poorly formatted URL
|
|
assert(
|
|
a.add('json://user:@@@:bad?no.good') is False)
|
|
assert(len(a) == 0)
|
|
|
|
# Add a server with our asset we created earlier
|
|
assert(
|
|
a.add('mmosts://mattermost.server.local/'
|
|
'3ccdd113474722377935511fc85d3dd4', asset=asset) is True)
|
|
|
|
# Clear our server listings again
|
|
a.clear()
|
|
|
|
# No servers to notify
|
|
assert(a.notify(title="my title", body="my body") is False)
|
|
|
|
class BadNotification(NotifyBase):
|
|
def __init__(self, **kwargs):
|
|
super(BadNotification, self).__init__(**kwargs)
|
|
|
|
# We fail whenever we're initialized
|
|
raise TypeError()
|
|
|
|
def url(self):
|
|
# Support URL
|
|
return ''
|
|
|
|
class GoodNotification(NotifyBase):
|
|
def __init__(self, **kwargs):
|
|
super(GoodNotification, self).__init__(
|
|
notify_format=NotifyFormat.HTML, **kwargs)
|
|
|
|
def url(self):
|
|
# Support URL
|
|
return ''
|
|
|
|
def notify(self, **kwargs):
|
|
# Pretend everything is okay
|
|
return True
|
|
|
|
# Store our bad notification in our schema map
|
|
SCHEMA_MAP['bad'] = BadNotification
|
|
|
|
# Store our good notification in our schema map
|
|
SCHEMA_MAP['good'] = GoodNotification
|
|
|
|
# Just to explain what is happening here, we would have parsed the
|
|
# url properly but failed when we went to go and create an instance
|
|
# of it.
|
|
assert(a.add('bad://localhost') is False)
|
|
assert(len(a) == 0)
|
|
|
|
assert(a.add('good://localhost') is True)
|
|
assert(len(a) == 1)
|
|
|
|
# Bad Notification Type is still allowed as it is presumed the user
|
|
# know's what their doing
|
|
assert(a.notify(
|
|
title="my title", body="my body", notify_type='bad') is True)
|
|
|
|
# No Title/Body combo's
|
|
assert(a.notify(title=None, body=None) is False)
|
|
assert(a.notify(title='', body=None) is False)
|
|
assert(a.notify(title=None, body='') is False)
|
|
|
|
# As long as one is present, we're good
|
|
assert(a.notify(title=None, body='present') is True)
|
|
assert(a.notify(title='present', body=None) is True)
|
|
assert(a.notify(title="present", body="present") is True)
|
|
|
|
# Clear our server listings again
|
|
a.clear()
|
|
|
|
class ThrowNotification(NotifyBase):
|
|
def notify(self, **kwargs):
|
|
# Pretend everything is okay
|
|
raise TypeError()
|
|
|
|
def url(self):
|
|
# Support URL
|
|
return ''
|
|
|
|
class RuntimeNotification(NotifyBase):
|
|
def notify(self, **kwargs):
|
|
# Pretend everything is okay
|
|
raise RuntimeError()
|
|
|
|
def url(self):
|
|
# Support URL
|
|
return ''
|
|
|
|
class FailNotification(NotifyBase):
|
|
|
|
def notify(self, **kwargs):
|
|
# Pretend everything is okay
|
|
return False
|
|
|
|
def url(self):
|
|
# Support URL
|
|
return ''
|
|
|
|
# Store our bad notification in our schema map
|
|
SCHEMA_MAP['throw'] = ThrowNotification
|
|
|
|
# Store our good notification in our schema map
|
|
SCHEMA_MAP['fail'] = FailNotification
|
|
|
|
# Store our good notification in our schema map
|
|
SCHEMA_MAP['runtime'] = RuntimeNotification
|
|
|
|
assert(a.add('runtime://localhost') is True)
|
|
assert(a.add('throw://localhost') is True)
|
|
assert(a.add('fail://localhost') is True)
|
|
assert(len(a) == 3)
|
|
|
|
# Test when our notify both throws an exception and or just
|
|
# simply returns False
|
|
assert(a.notify(title="present", body="present") is False)
|
|
|
|
# Create a Notification that throws an unexected exception
|
|
class ThrowInstantiateNotification(NotifyBase):
|
|
def __init__(self, **kwargs):
|
|
# Pretend everything is okay
|
|
raise TypeError()
|
|
|
|
def url(self):
|
|
# Support URL
|
|
return ''
|
|
|
|
SCHEMA_MAP['throw'] = ThrowInstantiateNotification
|
|
|
|
# Reset our object
|
|
a.clear()
|
|
assert(len(a) == 0)
|
|
|
|
# Instantiate a bad object
|
|
plugin = a.instantiate(object, tag="bad_object")
|
|
assert plugin is None
|
|
|
|
# Instantiate a good object
|
|
plugin = a.instantiate('good://localhost', tag="good")
|
|
assert(isinstance(plugin, NotifyBase))
|
|
|
|
# Test simple tagging inside of the object
|
|
assert("good" in plugin)
|
|
assert("bad" not in plugin)
|
|
|
|
# the in (__contains__ override) is based on or'ed content; so although
|
|
# 'bad' isn't tagged as being in the plugin, 'good' is, so the return
|
|
# value of this is True
|
|
assert(["bad", "good"] in plugin)
|
|
assert(set(["bad", "good"]) in plugin)
|
|
assert(("bad", "good") in plugin)
|
|
|
|
# We an add already substatiated instances into our Apprise object
|
|
a.add(plugin)
|
|
assert(len(a) == 1)
|
|
|
|
# We can add entries as a list too (to add more then one)
|
|
a.add([plugin, plugin, plugin])
|
|
assert(len(a) == 4)
|
|
|
|
# Reset our object again
|
|
a.clear()
|
|
try:
|
|
a.instantiate('throw://localhost', suppress_exceptions=False)
|
|
assert(False)
|
|
|
|
except TypeError:
|
|
assert(True)
|
|
assert(len(a) == 0)
|
|
|
|
assert(a.instantiate(
|
|
'throw://localhost', suppress_exceptions=True) is None)
|
|
assert(len(a) == 0)
|
|
|
|
#
|
|
# We rince and repeat the same tests as above, however we do them
|
|
# using the dict version
|
|
#
|
|
|
|
# Reset our object
|
|
a.clear()
|
|
assert(len(a) == 0)
|
|
|
|
# Instantiate a good object
|
|
plugin = a.instantiate({
|
|
'schema': 'good',
|
|
'host': 'localhost'}, tag="good")
|
|
assert(isinstance(plugin, NotifyBase))
|
|
|
|
# Test simple tagging inside of the object
|
|
assert("good" in plugin)
|
|
assert("bad" not in plugin)
|
|
|
|
# the in (__contains__ override) is based on or'ed content; so although
|
|
# 'bad' isn't tagged as being in the plugin, 'good' is, so the return
|
|
# value of this is True
|
|
assert(["bad", "good"] in plugin)
|
|
assert(set(["bad", "good"]) in plugin)
|
|
assert(("bad", "good") in plugin)
|
|
|
|
# We an add already substatiated instances into our Apprise object
|
|
a.add(plugin)
|
|
assert(len(a) == 1)
|
|
|
|
# We can add entries as a list too (to add more then one)
|
|
a.add([plugin, plugin, plugin])
|
|
assert(len(a) == 4)
|
|
|
|
# Reset our object again
|
|
a.clear()
|
|
try:
|
|
a.instantiate({
|
|
'schema': 'throw',
|
|
'host': 'localhost'}, suppress_exceptions=False)
|
|
assert(False)
|
|
|
|
except TypeError:
|
|
assert(True)
|
|
assert(len(a) == 0)
|
|
|
|
assert(a.instantiate({
|
|
'schema': 'throw',
|
|
'host': 'localhost'}, suppress_exceptions=True) is None)
|
|
assert(len(a) == 0)
|
|
|
|
# Privacy Print
|
|
# PrivacyMode.Secret always returns the same thing to avoid guessing
|
|
assert URLBase.pprint(
|
|
None, privacy=True, mode=PrivacyMode.Secret) == '****'
|
|
assert URLBase.pprint(
|
|
42, privacy=True, mode=PrivacyMode.Secret) == '****'
|
|
assert URLBase.pprint(
|
|
object, privacy=True, mode=PrivacyMode.Secret) == '****'
|
|
assert URLBase.pprint(
|
|
"", privacy=True, mode=PrivacyMode.Secret) == '****'
|
|
assert URLBase.pprint(
|
|
"a", privacy=True, mode=PrivacyMode.Secret) == '****'
|
|
assert URLBase.pprint(
|
|
"ab", privacy=True, mode=PrivacyMode.Secret) == '****'
|
|
assert URLBase.pprint(
|
|
"abcdefghijk", privacy=True, mode=PrivacyMode.Secret) == '****'
|
|
|
|
# PrivacyMode.Outer
|
|
assert URLBase.pprint(
|
|
None, privacy=True, mode=PrivacyMode.Outer) == ''
|
|
assert URLBase.pprint(
|
|
42, privacy=True, mode=PrivacyMode.Outer) == ''
|
|
assert URLBase.pprint(
|
|
object, privacy=True, mode=PrivacyMode.Outer) == ''
|
|
assert URLBase.pprint(
|
|
"", privacy=True, mode=PrivacyMode.Outer) == ''
|
|
assert URLBase.pprint(
|
|
"a", privacy=True, mode=PrivacyMode.Outer) == 'a...a'
|
|
assert URLBase.pprint(
|
|
"ab", privacy=True, mode=PrivacyMode.Outer) == 'a...b'
|
|
assert URLBase.pprint(
|
|
"abcdefghijk", privacy=True, mode=PrivacyMode.Outer) == 'a...k'
|
|
|
|
# PrivacyMode.Tail
|
|
assert URLBase.pprint(
|
|
None, privacy=True, mode=PrivacyMode.Tail) == ''
|
|
assert URLBase.pprint(
|
|
42, privacy=True, mode=PrivacyMode.Tail) == ''
|
|
assert URLBase.pprint(
|
|
object, privacy=True, mode=PrivacyMode.Tail) == ''
|
|
assert URLBase.pprint(
|
|
"", privacy=True, mode=PrivacyMode.Tail) == ''
|
|
assert URLBase.pprint(
|
|
"a", privacy=True, mode=PrivacyMode.Tail) == '...a'
|
|
assert URLBase.pprint(
|
|
"ab", privacy=True, mode=PrivacyMode.Tail) == '...ab'
|
|
assert URLBase.pprint(
|
|
"abcdefghijk", privacy=True, mode=PrivacyMode.Tail) == '...hijk'
|
|
|
|
|
|
@mock.patch('requests.get')
|
|
@mock.patch('requests.post')
|
|
def test_apprise_tagging(mock_post, mock_get):
|
|
"""
|
|
API: Apprise() object tagging functionality
|
|
|
|
"""
|
|
|
|
# A request
|
|
robj = mock.Mock()
|
|
setattr(robj, 'raw', mock.Mock())
|
|
# Allow raw.read() calls
|
|
robj.raw.read.return_value = ''
|
|
robj.text = ''
|
|
robj.content = ''
|
|
mock_get.return_value = robj
|
|
mock_post.return_value = robj
|
|
|
|
# Simulate a successful notification
|
|
mock_get.return_value.status_code = requests.codes.ok
|
|
mock_post.return_value.status_code = requests.codes.ok
|
|
|
|
# Create our object
|
|
a = Apprise()
|
|
|
|
# An invalid addition can't add the tag
|
|
assert(a.add('averyinvalidschema://localhost', tag='uhoh') is False)
|
|
assert(a.add({
|
|
'schema': 'averyinvalidschema',
|
|
'host': 'localhost'}, tag='uhoh') is False)
|
|
|
|
# Add entry and assign it to a tag called 'awesome'
|
|
assert(a.add('json://localhost/path1/', tag='awesome') is True)
|
|
assert(a.add({
|
|
'schema': 'json',
|
|
'host': 'localhost',
|
|
'fullpath': '/path1/'}, tag='awesome') is True)
|
|
|
|
# Add another notification and assign it to a tag called 'awesome'
|
|
# and another tag called 'local'
|
|
assert(a.add('json://localhost/path2/', tag=['mmost', 'awesome']) is True)
|
|
|
|
# notify the awesome tag; this would notify both services behind the
|
|
# scenes
|
|
assert(a.notify(title="my title", body="my body", tag='awesome') is True)
|
|
|
|
# notify all of the tags
|
|
assert(a.notify(
|
|
title="my title", body="my body", tag=['awesome', 'mmost']) is True)
|
|
|
|
# When we query against our loaded notifications for a tag that simply
|
|
# isn't assigned to anything, we return None. None (different then False)
|
|
# tells us that we litterally had nothing to query. We didn't fail...
|
|
# but we also didn't do anything...
|
|
assert(a.notify(
|
|
title="my title", body="my body", tag='missing') is None)
|
|
|
|
# Now to test the ability to and and/or notifications
|
|
a = Apprise()
|
|
|
|
# Add a tag by tuple
|
|
assert(a.add('json://localhost/tagA/', tag=("TagA", )) is True)
|
|
# Add 2 tags by string
|
|
assert(a.add('json://localhost/tagAB/', tag="TagA, TagB") is True)
|
|
# Add a tag using a set
|
|
assert(a.add('json://localhost/tagB/', tag=set(["TagB"])) is True)
|
|
# Add a tag by string (again)
|
|
assert(a.add('json://localhost/tagC/', tag="TagC") is True)
|
|
# Add 2 tags using a list
|
|
assert(a.add('json://localhost/tagCD/', tag=["TagC", "TagD"]) is True)
|
|
# Add a tag by string (again)
|
|
assert(a.add('json://localhost/tagD/', tag="TagD") is True)
|
|
# add a tag set by set (again)
|
|
assert(a.add('json://localhost/tagCDE/',
|
|
tag=set(["TagC", "TagD", "TagE"])) is True)
|
|
|
|
# Expression: TagC and TagD
|
|
# Matches the following only:
|
|
# - json://localhost/tagCD/
|
|
# - json://localhost/tagCDE/
|
|
assert(a.notify(
|
|
title="my title", body="my body", tag=[('TagC', 'TagD')]) is True)
|
|
|
|
# Expression: (TagY and TagZ) or TagX
|
|
# Matches nothing, None is returned in this case
|
|
assert(a.notify(
|
|
title="my title", body="my body",
|
|
tag=[('TagY', 'TagZ'), 'TagX']) is None)
|
|
|
|
# Expression: (TagY and TagZ) or TagA
|
|
# Matches the following only:
|
|
# - json://localhost/tagAB/
|
|
assert(a.notify(
|
|
title="my title", body="my body",
|
|
tag=[('TagY', 'TagZ'), 'TagA']) is True)
|
|
|
|
# Expression: (TagE and TagD) or TagB
|
|
# Matches the following only:
|
|
# - json://localhost/tagCDE/
|
|
# - json://localhost/tagAB/
|
|
# - json://localhost/tagB/
|
|
assert(a.notify(
|
|
title="my title", body="my body",
|
|
tag=[('TagE', 'TagD'), 'TagB']) is True)
|
|
|
|
# Garbage Entries; we can't do anything with the tag so we have nothing to
|
|
# notify as a result. So we simply return None
|
|
assert(a.notify(
|
|
title="my title", body="my body",
|
|
tag=[(object, ), ]) is None)
|
|
|
|
|
|
def test_apprise_notify_formats(tmpdir):
|
|
"""
|
|
API: Apprise() TextFormat tests
|
|
|
|
"""
|
|
# Caling load matix a second time which is an internal function causes it
|
|
# to skip over content already loaded into our matrix and thefore accesses
|
|
# other if/else parts of the code that aren't otherwise called
|
|
__load_matrix()
|
|
|
|
a = Apprise()
|
|
|
|
# no items
|
|
assert(len(a) == 0)
|
|
|
|
class TextNotification(NotifyBase):
|
|
# set our default notification format
|
|
notify_format = NotifyFormat.TEXT
|
|
|
|
def __init__(self, **kwargs):
|
|
super(TextNotification, self).__init__()
|
|
|
|
def notify(self, **kwargs):
|
|
# Pretend everything is okay
|
|
return True
|
|
|
|
def url(self):
|
|
# Support URL
|
|
return ''
|
|
|
|
class HtmlNotification(NotifyBase):
|
|
|
|
# set our default notification format
|
|
notify_format = NotifyFormat.HTML
|
|
|
|
def __init__(self, **kwargs):
|
|
super(HtmlNotification, self).__init__()
|
|
|
|
def notify(self, **kwargs):
|
|
# Pretend everything is okay
|
|
return True
|
|
|
|
def url(self):
|
|
# Support URL
|
|
return ''
|
|
|
|
class MarkDownNotification(NotifyBase):
|
|
|
|
# set our default notification format
|
|
notify_format = NotifyFormat.MARKDOWN
|
|
|
|
def __init__(self, **kwargs):
|
|
super(MarkDownNotification, self).__init__()
|
|
|
|
def notify(self, **kwargs):
|
|
# Pretend everything is okay
|
|
return True
|
|
|
|
def url(self):
|
|
# Support URL
|
|
return ''
|
|
|
|
# Store our notifications into our schema map
|
|
SCHEMA_MAP['text'] = TextNotification
|
|
SCHEMA_MAP['html'] = HtmlNotification
|
|
SCHEMA_MAP['markdown'] = MarkDownNotification
|
|
|
|
# Test Markdown; the above calls the markdown because our good://
|
|
# defined plugin above was defined to default to HTML which triggers
|
|
# a markdown to take place if the body_format specified on the notify
|
|
# call
|
|
assert(a.add('html://localhost') is True)
|
|
assert(a.add('html://another.server') is True)
|
|
assert(a.add('html://and.another') is True)
|
|
assert(a.add('text://localhost') is True)
|
|
assert(a.add('text://another.server') is True)
|
|
assert(a.add('text://and.another') is True)
|
|
assert(a.add('markdown://localhost') is True)
|
|
assert(a.add('markdown://another.server') is True)
|
|
assert(a.add('markdown://and.another') is True)
|
|
|
|
assert(len(a) == 9)
|
|
|
|
assert(a.notify(title="markdown", body="## Testing Markdown",
|
|
body_format=NotifyFormat.MARKDOWN) is True)
|
|
|
|
assert(a.notify(title="text", body="Testing Text",
|
|
body_format=NotifyFormat.TEXT) is True)
|
|
|
|
assert(a.notify(title="html", body="<b>HTML</b>",
|
|
body_format=NotifyFormat.HTML) is True)
|
|
|
|
|
|
def test_apprise_asset(tmpdir):
|
|
"""
|
|
API: AppriseAsset() object
|
|
|
|
"""
|
|
a = AppriseAsset(theme=None)
|
|
# Default theme
|
|
assert(a.theme == 'default')
|
|
|
|
a = AppriseAsset(
|
|
theme='dark',
|
|
image_path_mask='/{THEME}/{TYPE}-{XY}{EXTENSION}',
|
|
image_url_mask='http://localhost/{THEME}/{TYPE}-{XY}{EXTENSION}',
|
|
)
|
|
|
|
a.default_html_color = '#abcabc'
|
|
a.html_notify_map[NotifyType.INFO] = '#aaaaaa'
|
|
|
|
assert(a.color('invalid', tuple) == (171, 202, 188))
|
|
assert(a.color(NotifyType.INFO, tuple) == (170, 170, 170))
|
|
|
|
assert(a.color('invalid', int) == 11258556)
|
|
assert(a.color(NotifyType.INFO, int) == 11184810)
|
|
|
|
assert(a.color('invalid', None) == '#abcabc')
|
|
assert(a.color(NotifyType.INFO, None) == '#aaaaaa')
|
|
# None is the default
|
|
assert(a.color(NotifyType.INFO) == '#aaaaaa')
|
|
|
|
# Invalid Type
|
|
try:
|
|
a.color(NotifyType.INFO, dict)
|
|
# We should not get here (exception should be thrown)
|
|
assert(False)
|
|
|
|
except ValueError:
|
|
# The exception we expect since dict is not supported
|
|
assert(True)
|
|
|
|
except Exception:
|
|
# Any other exception is not good
|
|
assert(False)
|
|
|
|
assert(a.image_url(NotifyType.INFO, NotifyImageSize.XY_256) ==
|
|
'http://localhost/dark/info-256x256.png')
|
|
|
|
assert(a.image_path(
|
|
NotifyType.INFO,
|
|
NotifyImageSize.XY_256,
|
|
must_exist=False) == '/dark/info-256x256.png')
|
|
|
|
# This path doesn't exist so image_raw will fail (since we just
|
|
# randompyl picked it for testing)
|
|
assert(a.image_raw(NotifyType.INFO, NotifyImageSize.XY_256) is None)
|
|
|
|
assert(a.image_path(
|
|
NotifyType.INFO,
|
|
NotifyImageSize.XY_256,
|
|
must_exist=True) is None)
|
|
|
|
# Create a new object (with our default settings)
|
|
a = AppriseAsset()
|
|
|
|
# Our default configuration can access our file
|
|
assert(a.image_path(
|
|
NotifyType.INFO,
|
|
NotifyImageSize.XY_256,
|
|
must_exist=True) is not None)
|
|
|
|
assert(a.image_raw(NotifyType.INFO, NotifyImageSize.XY_256) is not None)
|
|
|
|
# Create a temporary directory
|
|
sub = tmpdir.mkdir("great.theme")
|
|
|
|
# Write a file
|
|
sub.join("{0}-{1}.png".format(
|
|
NotifyType.INFO,
|
|
NotifyImageSize.XY_256,
|
|
)).write("the content doesn't matter for testing.")
|
|
|
|
# Create an asset that will reference our file we just created
|
|
a = AppriseAsset(
|
|
theme='great.theme',
|
|
image_path_mask='%s/{THEME}/{TYPE}-{XY}.png' % dirname(sub.strpath),
|
|
)
|
|
|
|
# We'll be able to read file we just created
|
|
assert(a.image_raw(NotifyType.INFO, NotifyImageSize.XY_256) is not None)
|
|
|
|
# We can retrieve the filename at this point even with must_exist set
|
|
# to True
|
|
assert(a.image_path(
|
|
NotifyType.INFO,
|
|
NotifyImageSize.XY_256,
|
|
must_exist=True) is not None)
|
|
|
|
# If we make the file un-readable however, we won't be able to read it
|
|
# This test is just showing that we won't throw an exception
|
|
if getuid() == 0:
|
|
# Root always over-rides 0x000 permission settings making the below
|
|
# tests futile
|
|
pytest.skip('The Root user can not run file permission tests.')
|
|
|
|
chmod(dirname(sub.strpath), 0o000)
|
|
assert(a.image_raw(NotifyType.INFO, NotifyImageSize.XY_256) is None)
|
|
|
|
# Our path doesn't exist anymore using this logic
|
|
assert(a.image_path(
|
|
NotifyType.INFO,
|
|
NotifyImageSize.XY_256,
|
|
must_exist=True) is None)
|
|
|
|
# Return our permission so we don't have any problems with our cleanup
|
|
chmod(dirname(sub.strpath), 0o700)
|
|
|
|
# Our content is retrivable again
|
|
assert(a.image_raw(NotifyType.INFO, NotifyImageSize.XY_256) is not None)
|
|
|
|
# our file path is accessible again too
|
|
assert(a.image_path(
|
|
NotifyType.INFO,
|
|
NotifyImageSize.XY_256,
|
|
must_exist=True) is not None)
|
|
|
|
# We do the same test, but set the permission on the file
|
|
chmod(a.image_path(NotifyType.INFO, NotifyImageSize.XY_256), 0o000)
|
|
|
|
# our path will still exist in this case
|
|
assert(a.image_path(
|
|
NotifyType.INFO,
|
|
NotifyImageSize.XY_256,
|
|
must_exist=True) is not None)
|
|
|
|
# but we will not be able to open it
|
|
assert(a.image_raw(NotifyType.INFO, NotifyImageSize.XY_256) is None)
|
|
|
|
# Restore our permissions
|
|
chmod(a.image_path(NotifyType.INFO, NotifyImageSize.XY_256), 0o640)
|
|
|
|
# Disable all image references
|
|
a = AppriseAsset(image_path_mask=False, image_url_mask=False)
|
|
|
|
# We always return none in these calls now
|
|
assert(a.image_raw(NotifyType.INFO, NotifyImageSize.XY_256) is None)
|
|
assert(a.image_url(NotifyType.INFO, NotifyImageSize.XY_256) is None)
|
|
assert(a.image_path(NotifyType.INFO, NotifyImageSize.XY_256,
|
|
must_exist=False) is None)
|
|
assert(a.image_path(NotifyType.INFO, NotifyImageSize.XY_256,
|
|
must_exist=True) is None)
|
|
|
|
# Test our default extension out
|
|
a = AppriseAsset(
|
|
image_path_mask='/{THEME}/{TYPE}-{XY}{EXTENSION}',
|
|
image_url_mask='http://localhost/{THEME}/{TYPE}-{XY}{EXTENSION}',
|
|
default_extension='.jpeg',
|
|
)
|
|
assert(a.image_path(
|
|
NotifyType.INFO,
|
|
NotifyImageSize.XY_256,
|
|
must_exist=False) == '/default/info-256x256.jpeg')
|
|
|
|
assert(a.image_url(
|
|
NotifyType.INFO,
|
|
NotifyImageSize.XY_256) == 'http://localhost/'
|
|
'default/info-256x256.jpeg')
|
|
|
|
# extension support
|
|
assert(a.image_path(
|
|
NotifyType.INFO,
|
|
NotifyImageSize.XY_128,
|
|
must_exist=False,
|
|
extension='.ico') == '/default/info-128x128.ico')
|
|
|
|
assert(a.image_url(
|
|
NotifyType.INFO,
|
|
NotifyImageSize.XY_256,
|
|
extension='.test') == 'http://localhost/'
|
|
'default/info-256x256.test')
|
|
|
|
|
|
def test_apprise_details():
|
|
"""
|
|
API: Apprise() Details
|
|
|
|
"""
|
|
# Reset our matrix
|
|
__reset_matrix()
|
|
|
|
# This is a made up class that is just used to verify
|
|
class TestDetailNotification(NotifyBase):
|
|
"""
|
|
This class is used to test various configurations supported
|
|
"""
|
|
|
|
# Minimum requirements for a plugin to produce details
|
|
service_name = 'Detail Testing'
|
|
|
|
# The default simple (insecure) protocol (used by NotifyMail)
|
|
protocol = 'details'
|
|
|
|
# Set test_bool flag
|
|
always_true = True
|
|
always_false = False
|
|
|
|
# Define object templates
|
|
templates = (
|
|
'{schema}://{host}',
|
|
'{schema}://{host}:{port}',
|
|
'{schema}://{user}@{host}:{port}',
|
|
'{schema}://{user}:{pass}@{host}:{port}',
|
|
)
|
|
|
|
# Define our tokens; these are the minimum tokens required required to
|
|
# be passed into this function (as arguments). The syntax appends any
|
|
# previously defined in the base package and builds onto them
|
|
template_tokens = dict(NotifyBase.template_tokens, **{
|
|
'notype': {
|
|
# Nothing defined is still valid
|
|
},
|
|
'regex_test01': {
|
|
'name': _('RegexTest'),
|
|
'type': 'string',
|
|
'regex': r'[A-Z0-9]',
|
|
},
|
|
'regex_test02': {
|
|
'name': _('RegexTest'),
|
|
# Support regex options too
|
|
'regex': (r'[A-Z0-9]', 'i'),
|
|
},
|
|
'regex_test03': {
|
|
'name': _('RegexTest'),
|
|
# Support regex option without a second option
|
|
'regex': (r'[A-Z0-9]'),
|
|
},
|
|
'regex_test04': {
|
|
# this entry would just end up getting removed
|
|
'regex': None,
|
|
},
|
|
# List without delimiters (causes defaults to kick in)
|
|
'mylistA': {
|
|
'name': 'fruit',
|
|
'type': 'list:string',
|
|
},
|
|
# A list with a delimiter list
|
|
'mylistB': {
|
|
'name': 'softdrinks',
|
|
'type': 'list:string',
|
|
'delim': ['|', '-'],
|
|
},
|
|
})
|
|
|
|
template_args = dict(NotifyBase.template_args, **{
|
|
# Test _exist_if logic
|
|
'test_exists_if_01': {
|
|
'name': 'Always False',
|
|
'type': 'bool',
|
|
# Provide a default
|
|
'default': False,
|
|
# Base the existance of this key/value entry on the lookup
|
|
# of this class value at runtime. Hence:
|
|
# if not NotifyObject.always_false
|
|
# del this_entry
|
|
#
|
|
'_exists_if': 'always_false',
|
|
},
|
|
# Test _exist_if logic
|
|
'test_exists_if_02': {
|
|
'name': 'Always True',
|
|
'type': 'bool',
|
|
# Provide a default
|
|
'default': False,
|
|
# Base the existance of this key/value entry on the lookup
|
|
# of this class value at runtime. Hence:
|
|
# if not NotifyObject.always_true
|
|
# del this_entry
|
|
#
|
|
'_exists_if': 'always_true',
|
|
},
|
|
})
|
|
|
|
def url(self):
|
|
# Support URL
|
|
return ''
|
|
|
|
def notify(self, **kwargs):
|
|
# Pretend everything is okay (so we don't break other tests)
|
|
return True
|
|
|
|
# Store our good detail notification in our schema map
|
|
SCHEMA_MAP['details'] = TestDetailNotification
|
|
|
|
# Create our Apprise instance
|
|
a = Apprise()
|
|
|
|
# Dictionary response
|
|
assert isinstance(a.details(), dict)
|
|
|
|
# Reset our matrix
|
|
__reset_matrix()
|
|
__load_matrix()
|
|
|
|
|
|
def test_apprise_details_plugin_verification():
|
|
"""
|
|
API: Apprise() Details Plugin Verification
|
|
|
|
"""
|
|
|
|
# Reset our matrix
|
|
__reset_matrix()
|
|
__load_matrix()
|
|
|
|
a = Apprise()
|
|
|
|
# Details object
|
|
details = a.details()
|
|
|
|
# Dictionary response
|
|
assert isinstance(details, dict)
|
|
|
|
# Details object with language defined:
|
|
details = a.details(lang='en')
|
|
|
|
# Dictionary response
|
|
assert isinstance(details, dict)
|
|
|
|
# Details object with unsupported language:
|
|
details = a.details(lang='xx')
|
|
|
|
# Dictionary response
|
|
assert isinstance(details, dict)
|
|
|
|
# Apprise version
|
|
assert 'version' in details
|
|
assert details.get('version') == __version__
|
|
|
|
# Defined schemas identify each plugin
|
|
assert 'schemas' in details
|
|
assert isinstance(details.get('schemas'), list)
|
|
|
|
# We have an entry per defined plugin
|
|
assert 'asset' in details
|
|
assert isinstance(details.get('asset'), dict)
|
|
assert 'app_id' in details['asset']
|
|
assert 'app_desc' in details['asset']
|
|
assert 'default_extension' in details['asset']
|
|
assert 'theme' in details['asset']
|
|
assert 'image_path_mask' in details['asset']
|
|
assert 'image_url_mask' in details['asset']
|
|
assert 'image_url_logo' in details['asset']
|
|
|
|
# Valid Type Regular Expression Checker
|
|
# Case Sensitive and MUST match the following:
|
|
is_valid_type_re = re.compile(
|
|
r'((choice|list):)?(string|bool|int|float)')
|
|
|
|
# match tokens found in templates so we can cross reference them back
|
|
# to see if they have a matching argument
|
|
template_token_re = re.compile(r'{([^}]+)}[^{]*?(?=$|{)')
|
|
|
|
# Define acceptable map_to arguments that can be tied in with the
|
|
# kwargs function definitions.
|
|
valid_kwargs = set([
|
|
# URL prepared kwargs
|
|
'user', 'password', 'port', 'host', 'schema', 'fullpath',
|
|
# URLBase and NotifyBase args:
|
|
'verify', 'format', 'overflow',
|
|
])
|
|
|
|
# Valid Schema Entries:
|
|
valid_schema_keys = (
|
|
'name', 'private', 'required', 'type', 'values', 'min', 'max',
|
|
'regex', 'default', 'list', 'delim', 'prefix', 'map_to', 'alias_of',
|
|
)
|
|
for entry in details['schemas']:
|
|
|
|
# Track the map_to entries (if specified); We need to make sure that
|
|
# these properly map back
|
|
map_to_entries = set()
|
|
|
|
# Track the alias_of entries
|
|
map_to_aliases = set()
|
|
|
|
# A Service Name MUST be defined
|
|
assert 'service_name' in entry
|
|
assert isinstance(entry['service_name'], six.string_types)
|
|
|
|
# Acquire our protocols
|
|
protocols = parse_list(
|
|
entry['protocols'], entry['secure_protocols'])
|
|
|
|
# At least one schema/protocol MUST be defined
|
|
assert len(protocols) > 0
|
|
|
|
# our details
|
|
assert 'details' in entry
|
|
assert isinstance(entry['details'], dict)
|
|
|
|
# All schema details should include args
|
|
for section in ['kwargs', 'args', 'tokens']:
|
|
assert section in entry['details']
|
|
assert isinstance(entry['details'][section], dict)
|
|
|
|
for key, arg in entry['details'][section].items():
|
|
# Validate keys (case-sensitive)
|
|
assert len([k for k in arg.keys()
|
|
if k not in valid_schema_keys]) == 0
|
|
|
|
# Test our argument
|
|
assert isinstance(arg, dict)
|
|
|
|
if 'alias_of' not in arg:
|
|
# Minimum requirement of an argument
|
|
assert 'name' in arg
|
|
assert isinstance(arg['name'], six.string_types)
|
|
|
|
assert 'type' in arg
|
|
assert isinstance(arg['type'], six.string_types)
|
|
assert is_valid_type_re.match(arg['type']) is not None
|
|
|
|
if 'min' in arg:
|
|
assert arg['type'].endswith('float') \
|
|
or arg['type'].endswith('int')
|
|
assert isinstance(arg['min'], (int, float))
|
|
|
|
if 'max' in arg:
|
|
# If a min and max was specified, at least check
|
|
# to confirm the min is less then the max
|
|
assert arg['min'] < arg['max']
|
|
|
|
if 'max' in arg:
|
|
assert arg['type'].endswith('float') \
|
|
or arg['type'].endswith('int')
|
|
assert isinstance(arg['max'], (int, float))
|
|
|
|
if 'private' in arg:
|
|
assert isinstance(arg['private'], bool)
|
|
|
|
if 'required' in arg:
|
|
assert isinstance(arg['required'], bool)
|
|
|
|
if 'prefix' in arg:
|
|
assert isinstance(arg['prefix'], six.string_types)
|
|
if section == 'kwargs':
|
|
# The only acceptable prefix types for kwargs
|
|
assert arg['prefix'] in ('+', '-')
|
|
|
|
else:
|
|
# kwargs requires that the 'prefix' is defined
|
|
assert section != 'kwargs'
|
|
|
|
if 'map_to' in arg:
|
|
# must be a string
|
|
assert isinstance(arg['map_to'], six.string_types)
|
|
# Track our map_to object
|
|
map_to_entries.add(arg['map_to'])
|
|
|
|
else:
|
|
map_to_entries.add(key)
|
|
|
|
# Some verification
|
|
if arg['type'].startswith('choice'):
|
|
|
|
# choice:bool is redundant and should be swapped to
|
|
# just bool
|
|
assert not arg['type'].endswith('bool')
|
|
|
|
# Choices require that a values list is provided
|
|
assert 'values' in arg
|
|
assert isinstance(arg['values'], (list, tuple))
|
|
assert len(arg['values']) > 0
|
|
|
|
# Test default
|
|
if 'default' in arg:
|
|
# if a default is provided on a choice object,
|
|
# it better be in the list of values
|
|
assert arg['default'] in arg['values']
|
|
|
|
if arg['type'].startswith('bool'):
|
|
# Boolean choices are less restrictive but require a
|
|
# default value
|
|
assert 'default' in arg
|
|
assert isinstance(arg['default'], bool)
|
|
|
|
if 'regex' in arg:
|
|
# Regex must ALWAYS be in the format (regex, option)
|
|
assert isinstance(arg['regex'], (tuple, list))
|
|
assert len(arg['regex']) == 2
|
|
assert isinstance(arg['regex'][0], six.string_types)
|
|
assert arg['regex'][1] is None or isinstance(
|
|
arg['regex'][1], six.string_types)
|
|
|
|
# Compile the regular expression to verify that it is
|
|
# valid
|
|
try:
|
|
re.compile(arg['regex'][0])
|
|
except:
|
|
assert '{} is an invalid regex'\
|
|
.format(arg['regex'][0])
|
|
|
|
# Regex should never start and/or end with ^/$; leave
|
|
# that up to the user making use of the regex instead
|
|
assert re.match(r'^[()\s]*\^', arg['regex'][0]) is None
|
|
assert re.match(r'[()\s$]*\$', arg['regex'][0]) is None
|
|
|
|
if arg['type'].startswith('list'):
|
|
# Delimiters MUST be defined
|
|
assert 'delim' in arg
|
|
assert isinstance(arg['delim'], (list, tuple))
|
|
assert len(arg['delim']) > 0
|
|
|
|
else: # alias_of is in the object
|
|
# must be a string
|
|
assert isinstance(arg['alias_of'], six.string_types)
|
|
# Track our alias_of object
|
|
map_to_aliases.add(arg['alias_of'])
|
|
# 2 entries (name, and alias_of only!)
|
|
assert len(entry['details'][section][key]) == 1
|
|
|
|
if six.PY2:
|
|
# inspect our object
|
|
# getargspec() is depricated in Python v3
|
|
spec = inspect.getargspec(SCHEMA_MAP[protocols[0]].__init__)
|
|
|
|
function_args = \
|
|
(set(parse_list(spec.keywords)) - set(['kwargs'])) \
|
|
| (set(spec.args) - set(['self'])) | valid_kwargs
|
|
else:
|
|
# Python v3+ uses getfullargspec()
|
|
spec = inspect.getfullargspec(SCHEMA_MAP[protocols[0]].__init__)
|
|
|
|
function_args = \
|
|
(set(parse_list(spec.varkw)) - set(['kwargs'])) \
|
|
| (set(spec.args) - set(['self'])) | valid_kwargs
|
|
|
|
# Iterate over our map_to_entries and make sure that everything
|
|
# maps to a function argument
|
|
for arg in map_to_entries:
|
|
if arg not in function_args:
|
|
# This print statement just makes the error easier to
|
|
# troubleshoot
|
|
print('{}:// template/arg/func reference missing error.'
|
|
.format(protocols[0]))
|
|
assert arg in function_args
|
|
|
|
# Iterate over all of the function arguments and make sure that
|
|
# it maps back to a key
|
|
function_args -= valid_kwargs
|
|
for arg in function_args:
|
|
if arg not in map_to_entries:
|
|
# This print statement just makes the error easier to
|
|
# troubleshoot
|
|
print('{}:// template/func/arg reference missing error.'
|
|
.format(protocols[0]))
|
|
assert arg in map_to_entries
|
|
|
|
# Iterate over our map_to_aliases and make sure they were defined in
|
|
# either the as a token or arg
|
|
for arg in map_to_aliases:
|
|
assert arg in set(entry['details']['args'].keys()) \
|
|
| set(entry['details']['tokens'].keys())
|
|
|
|
# Template verification
|
|
assert 'templates' in entry['details']
|
|
assert isinstance(entry['details']['templates'], (set, tuple, list))
|
|
|
|
# Iterate over our templates and parse our arguments
|
|
for template in entry['details']['templates']:
|
|
# Ensure we've properly opened and closed all of our tokens
|
|
assert template.count('{') == template.count('}')
|
|
|
|
expected_tokens = template.count('}')
|
|
args = template_token_re.findall(template)
|
|
assert expected_tokens == len(args)
|
|
|
|
# Build a cross reference set of our current defined objects
|
|
defined_tokens = set()
|
|
for key, arg in entry['details']['tokens'].items():
|
|
defined_tokens.add(key)
|
|
if 'alias_of' in arg:
|
|
defined_tokens.add(arg['alias_of'])
|
|
|
|
# We want to make sure all of our defined tokens have been
|
|
# accounted for in at least one defined template
|
|
for arg in args:
|
|
assert arg in set(entry['details']['args'].keys()) \
|
|
| set(entry['details']['tokens'].keys())
|
|
|
|
# The reverse of the above; make sure that each entry defined
|
|
# in the template_tokens is accounted for in at least one of
|
|
# the defined templates
|
|
assert arg in defined_tokens
|
|
|
|
|
|
def test_notify_matrix_dynamic_importing(tmpdir):
|
|
"""
|
|
API: Apprise() Notify Matrix Importing
|
|
|
|
"""
|
|
|
|
# Make our new path valid
|
|
suite = tmpdir.mkdir("apprise_notify_test_suite")
|
|
suite.join("__init__.py").write('')
|
|
|
|
module_name = 'badnotify'
|
|
|
|
# Update our path to point to our new test suite
|
|
sys.path.insert(0, str(suite))
|
|
|
|
# Create a base area to work within
|
|
base = suite.mkdir(module_name)
|
|
base.join("__init__.py").write('')
|
|
|
|
# Test no app_id
|
|
base.join('NotifyBadFile1.py').write(
|
|
"""
|
|
class NotifyBadFile1(object):
|
|
pass""")
|
|
|
|
# No class of the same name
|
|
base.join('NotifyBadFile2.py').write(
|
|
"""
|
|
class BadClassName(object):
|
|
pass""")
|
|
|
|
# Exception thrown
|
|
base.join('NotifyBadFile3.py').write("""raise ImportError()""")
|
|
|
|
# Utilizes a schema:// already occupied (as string)
|
|
base.join('NotifyGoober.py').write(
|
|
"""
|
|
from apprise import NotifyBase
|
|
class NotifyGoober(NotifyBase):
|
|
# This class tests the fact we have a new class name, but we're
|
|
# trying to over-ride items previously used
|
|
|
|
# The default simple (insecure) protocol (used by NotifyMail)
|
|
protocol = 'mailto'
|
|
|
|
# The default secure protocol (used by NotifyMail)
|
|
secure_protocol = 'mailtos'""")
|
|
|
|
# Utilizes a schema:// already occupied (as tuple)
|
|
base.join('NotifyBugger.py').write("""
|
|
from apprise import NotifyBase
|
|
class NotifyBugger(NotifyBase):
|
|
# This class tests the fact we have a new class name, but we're
|
|
# trying to over-ride items previously used
|
|
|
|
# The default simple (insecure) protocol (used by NotifyMail), the other
|
|
# isn't
|
|
protocol = ('mailto', 'bugger-test' )
|
|
|
|
# The default secure protocol (used by NotifyMail), the other isn't
|
|
secure_protocol = ('mailtos', 'bugger-tests')""")
|
|
|
|
__load_matrix(path=str(base), name=module_name)
|