Dynamic module loading; just drop in plugins now

This commit is contained in:
Chris Caron 2019-03-16 23:35:06 -04:00
parent 2d5fe7ac7a
commit 96064cff25
12 changed files with 797 additions and 711 deletions

View File

@ -23,41 +23,71 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import sys
import six
import re
from .ConfigHTTP import ConfigHTTP
from .ConfigFile import ConfigFile
from os import listdir
from os.path import dirname
from os.path import abspath
# Maintains a mapping of all of the configuration services
SCHEMA_MAP = {}
__all__ = [
# Configuration Services
'ConfigFile', 'ConfigHTTP',
]
__all__ = []
# Load our Lookup Matrix
def __load_matrix():
def __load_matrix(path=abspath(dirname(__file__)), name='apprise.config'):
"""
Dynamically load our schema map; this allows us to gracefully
skip over modules we simply don't have the dependencies for.
"""
# Used for the detection of additional Notify Services objects
# The .py extension is optional as we support loading directories too
module_re = re.compile(r'^(?P<name>Config[a-z0-9]+)(\.py)?$', re.I)
thismodule = sys.modules[__name__]
for f in listdir(path):
match = module_re.match(f)
if not match:
# keep going
continue
# to add it's mapping to our hash table
for entry in dir(thismodule):
# Store our notification/plugin name:
plugin_name = match.group('name')
try:
module = __import__(
'{}.{}'.format(name, plugin_name),
globals(), locals(),
fromlist=[plugin_name])
except ImportError:
# No problem, we can't use this object
continue
if not hasattr(module, plugin_name):
# Not a library we can load as it doesn't follow the simple rule
# that the class must bear the same name as the notification
# file itself.
continue
# Get our plugin
plugin = getattr(thismodule, entry)
if not hasattr(plugin, 'app_id'): # pragma: no branch
plugin = getattr(module, plugin_name)
if not hasattr(plugin, 'app_id'):
# Filter out non-notification modules
continue
elif plugin_name in __all__:
# we're already handling this object
continue
# Add our module name to our __all__
__all__.append(plugin_name)
# Ensure we provide the class as the reference to this directory and
# not the module:
globals()[plugin_name] = plugin
# Load protocol(s) if defined
proto = getattr(plugin, 'protocol', None)
if isinstance(proto, six.string_types):
@ -82,6 +112,8 @@ def __load_matrix():
if p not in SCHEMA_MAP:
SCHEMA_MAP[p] = plugin
return SCHEMA_MAP
# Dynamically build our module
# Dynamically build our schema base
__load_matrix()

View File

@ -1,312 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from .gntp import notifier
from .gntp import errors
from ..NotifyBase import NotifyBase
from ...common import NotifyImageSize
from ...common import NotifyType
# Priorities
class GrowlPriority(object):
LOW = -2
MODERATE = -1
NORMAL = 0
HIGH = 1
EMERGENCY = 2
GROWL_PRIORITIES = (
GrowlPriority.LOW,
GrowlPriority.MODERATE,
GrowlPriority.NORMAL,
GrowlPriority.HIGH,
GrowlPriority.EMERGENCY,
)
GROWL_NOTIFICATION_TYPE = "New Messages"
class NotifyGrowl(NotifyBase):
"""
A wrapper to Growl Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Growl'
# The services URL
service_url = 'http://growl.info/'
# The default protocol
protocol = 'growl'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_growl'
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_72
# Disable throttle rate for Growl requests since they are normally
# local anyway
request_rate_per_sec = 0
# A title can not be used for Growl Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.
title_maxlen = 0
# Limit results to just the first 10 line otherwise there is just to much
# content to display
body_max_line_count = 2
# Default Growl Port
default_port = 23053
def __init__(self, priority=None, version=2, **kwargs):
"""
Initialize Growl Object
"""
super(NotifyGrowl, self).__init__(**kwargs)
if not self.port:
self.port = self.default_port
# The Priority of the message
if priority not in GROWL_PRIORITIES:
self.priority = GrowlPriority.NORMAL
else:
self.priority = priority
# Always default the sticky flag to False
self.sticky = False
# Store Version
self.version = version
payload = {
'applicationName': self.app_id,
'notifications': [GROWL_NOTIFICATION_TYPE, ],
'defaultNotifications': [GROWL_NOTIFICATION_TYPE, ],
'hostname': self.host,
'port': self.port,
}
if self.password is not None:
payload['password'] = self.password
self.logger.debug('Growl Registration Payload: %s' % str(payload))
self.growl = notifier.GrowlNotifier(**payload)
try:
self.growl.register()
self.logger.debug(
'Growl server registration completed successfully.'
)
except errors.NetworkError:
self.logger.warning(
'A network error occured sending Growl '
'notification to %s.' % self.host)
raise TypeError(
'A network error occured sending Growl '
'notification to %s.' % self.host)
except errors.AuthError:
self.logger.warning(
'An authentication error occured sending Growl '
'notification to %s.' % self.host)
raise TypeError(
'An authentication error occured sending Growl '
'notification to %s.' % self.host)
except errors.UnsupportedError:
self.logger.warning(
'An unsupported error occured sending Growl '
'notification to %s.' % self.host)
raise TypeError(
'An unsupported error occured sending Growl '
'notification to %s.' % self.host)
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Growl Notification
"""
icon = None
if self.version >= 2:
# URL Based
icon = self.image_url(notify_type)
else:
# Raw
icon = self.image_raw(notify_type)
payload = {
'noteType': GROWL_NOTIFICATION_TYPE,
'title': title,
'description': body,
'icon': icon is not None,
'sticky': False,
'priority': self.priority,
}
self.logger.debug('Growl Payload: %s' % str(payload))
# Update icon of payload to be raw data; this is intentionally done
# here after we spit the debug message above (so we don't try to
# print the binary contents of an image
payload['icon'] = icon
# Always call throttle before any remote server i/o is made
self.throttle()
try:
response = self.growl.notify(**payload)
if not isinstance(response, bool):
self.logger.warning(
'Growl notification failed to send with response: %s' %
str(response),
)
else:
self.logger.info('Sent Growl notification.')
except errors.BaseError as e:
# Since Growl servers listen for UDP broadcasts, it's possible
# that you will never get to this part of the code since there is
# no acknowledgement as to whether it accepted what was sent to it
# or not.
# However, if the host/server is unavailable, you will get to this
# point of the code.
self.logger.warning(
'A Connection error occured sending Growl '
'notification to %s.' % self.host)
self.logger.debug('Growl Exception: %s' % str(e))
# Return; we're done
return False
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
_map = {
GrowlPriority.LOW: 'low',
GrowlPriority.MODERATE: 'moderate',
GrowlPriority.NORMAL: 'normal',
GrowlPriority.HIGH: 'high',
GrowlPriority.EMERGENCY: 'emergency',
}
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'priority':
_map[GrowlPriority.NORMAL] if self.priority not in _map
else _map[self.priority],
'version': self.version,
}
auth = ''
if self.password:
auth = '{password}@'.format(
password=self.quote(self.user, safe=''),
)
return '{schema}://{auth}{hostname}{port}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.host,
port='' if self.port is None or self.port == self.default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
version = None
if 'version' in results['qsd'] and len(results['qsd']['version']):
# Allow the user to specify the version of the protocol to use.
try:
version = int(
NotifyBase.unquote(
results['qsd']['version']).strip().split('.')[0])
except (AttributeError, IndexError, TypeError, ValueError):
NotifyBase.logger.warning(
'An invalid Growl version of "%s" was specified and will '
'be ignored.' % results['qsd']['version']
)
pass
if 'priority' in results['qsd'] and len(results['qsd']['priority']):
_map = {
'l': GrowlPriority.LOW,
'm': GrowlPriority.MODERATE,
'n': GrowlPriority.NORMAL,
'h': GrowlPriority.HIGH,
'e': GrowlPriority.EMERGENCY,
}
try:
results['priority'] = \
_map[results['qsd']['priority'][0].lower()]
except KeyError:
# No priority was set
pass
# Because of the URL formatting, the password is actually where the
# username field is. For this reason, we just preform this small hack
# to make it (the URL) conform correctly. The following strips out the
# existing password entry (if exists) so that it can be swapped with
# the new one we specify.
if results.get('password', None) is None:
results['password'] = results.get('user', None)
if version:
results['version'] = version
return results

View File

@ -23,8 +23,290 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from . import NotifyGrowl
from .gntp import notifier
from .gntp import errors
from ..NotifyBase import NotifyBase
from ...common import NotifyImageSize
from ...common import NotifyType
__all__ = [
'NotifyGrowl',
]
# Priorities
class GrowlPriority(object):
LOW = -2
MODERATE = -1
NORMAL = 0
HIGH = 1
EMERGENCY = 2
GROWL_PRIORITIES = (
GrowlPriority.LOW,
GrowlPriority.MODERATE,
GrowlPriority.NORMAL,
GrowlPriority.HIGH,
GrowlPriority.EMERGENCY,
)
GROWL_NOTIFICATION_TYPE = "New Messages"
class NotifyGrowl(NotifyBase):
"""
A wrapper to Growl Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Growl'
# The services URL
service_url = 'http://growl.info/'
# The default protocol
protocol = 'growl'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_growl'
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_72
# Disable throttle rate for Growl requests since they are normally
# local anyway
request_rate_per_sec = 0
# A title can not be used for Growl Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.
title_maxlen = 0
# Limit results to just the first 10 line otherwise there is just to much
# content to display
body_max_line_count = 2
# Default Growl Port
default_port = 23053
def __init__(self, priority=None, version=2, **kwargs):
"""
Initialize Growl Object
"""
super(NotifyGrowl, self).__init__(**kwargs)
if not self.port:
self.port = self.default_port
# The Priority of the message
if priority not in GROWL_PRIORITIES:
self.priority = GrowlPriority.NORMAL
else:
self.priority = priority
# Always default the sticky flag to False
self.sticky = False
# Store Version
self.version = version
payload = {
'applicationName': self.app_id,
'notifications': [GROWL_NOTIFICATION_TYPE, ],
'defaultNotifications': [GROWL_NOTIFICATION_TYPE, ],
'hostname': self.host,
'port': self.port,
}
if self.password is not None:
payload['password'] = self.password
self.logger.debug('Growl Registration Payload: %s' % str(payload))
self.growl = notifier.GrowlNotifier(**payload)
try:
self.growl.register()
self.logger.debug(
'Growl server registration completed successfully.'
)
except errors.NetworkError:
self.logger.warning(
'A network error occured sending Growl '
'notification to %s.' % self.host)
raise TypeError(
'A network error occured sending Growl '
'notification to %s.' % self.host)
except errors.AuthError:
self.logger.warning(
'An authentication error occured sending Growl '
'notification to %s.' % self.host)
raise TypeError(
'An authentication error occured sending Growl '
'notification to %s.' % self.host)
except errors.UnsupportedError:
self.logger.warning(
'An unsupported error occured sending Growl '
'notification to %s.' % self.host)
raise TypeError(
'An unsupported error occured sending Growl '
'notification to %s.' % self.host)
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Growl Notification
"""
icon = None
if self.version >= 2:
# URL Based
icon = self.image_url(notify_type)
else:
# Raw
icon = self.image_raw(notify_type)
payload = {
'noteType': GROWL_NOTIFICATION_TYPE,
'title': title,
'description': body,
'icon': icon is not None,
'sticky': False,
'priority': self.priority,
}
self.logger.debug('Growl Payload: %s' % str(payload))
# Update icon of payload to be raw data; this is intentionally done
# here after we spit the debug message above (so we don't try to
# print the binary contents of an image
payload['icon'] = icon
# Always call throttle before any remote server i/o is made
self.throttle()
try:
response = self.growl.notify(**payload)
if not isinstance(response, bool):
self.logger.warning(
'Growl notification failed to send with response: %s' %
str(response),
)
else:
self.logger.info('Sent Growl notification.')
except errors.BaseError as e:
# Since Growl servers listen for UDP broadcasts, it's possible
# that you will never get to this part of the code since there is
# no acknowledgement as to whether it accepted what was sent to it
# or not.
# However, if the host/server is unavailable, you will get to this
# point of the code.
self.logger.warning(
'A Connection error occured sending Growl '
'notification to %s.' % self.host)
self.logger.debug('Growl Exception: %s' % str(e))
# Return; we're done
return False
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
_map = {
GrowlPriority.LOW: 'low',
GrowlPriority.MODERATE: 'moderate',
GrowlPriority.NORMAL: 'normal',
GrowlPriority.HIGH: 'high',
GrowlPriority.EMERGENCY: 'emergency',
}
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'priority':
_map[GrowlPriority.NORMAL] if self.priority not in _map
else _map[self.priority],
'version': self.version,
}
auth = ''
if self.password:
auth = '{password}@'.format(
password=self.quote(self.user, safe=''),
)
return '{schema}://{auth}{hostname}{port}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.host,
port='' if self.port is None or self.port == self.default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
version = None
if 'version' in results['qsd'] and len(results['qsd']['version']):
# Allow the user to specify the version of the protocol to use.
try:
version = int(
NotifyBase.unquote(
results['qsd']['version']).strip().split('.')[0])
except (AttributeError, IndexError, TypeError, ValueError):
NotifyBase.logger.warning(
'An invalid Growl version of "%s" was specified and will '
'be ignored.' % results['qsd']['version']
)
pass
if 'priority' in results['qsd'] and len(results['qsd']['priority']):
_map = {
'l': GrowlPriority.LOW,
'm': GrowlPriority.MODERATE,
'n': GrowlPriority.NORMAL,
'h': GrowlPriority.HIGH,
'e': GrowlPriority.EMERGENCY,
}
try:
results['priority'] = \
_map[results['qsd']['priority'][0].lower()]
except KeyError:
# No priority was set
pass
# Because of the URL formatting, the password is actually where the
# username field is. For this reason, we just preform this small hack
# to make it (the URL) conform correctly. The following strips out the
# existing password entry (if exists) so that it can be swapped with
# the new one we specify.
if results.get('password', None) is None:
results['password'] = results.get('user', None)
if version:
results['version'] = version
return results

View File

@ -1,144 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import re
from .pushjet import errors
from .pushjet import pushjet
from ..NotifyBase import NotifyBase
from ...common import NotifyType
PUBLIC_KEY_RE = re.compile(
r'^[a-z0-9]{4}-[a-z0-9]{6}-[a-z0-9]{12}-[a-z0-9]{5}-[a-z0-9]{9}$', re.I)
SECRET_KEY_RE = re.compile(r'^[a-z0-9]{32}$', re.I)
class NotifyPushjet(NotifyBase):
"""
A wrapper for Pushjet Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Pushjet'
# The default protocol
protocol = 'pjet'
# The default secure protocol
secure_protocol = 'pjets'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_pushjet'
# Disable throttle rate for Pushjet requests since they are normally
# local anyway (the remote/online service is no more)
request_rate_per_sec = 0
def __init__(self, secret_key, **kwargs):
"""
Initialize Pushjet Object
"""
super(NotifyPushjet, self).__init__(**kwargs)
# store our key
self.secret_key = secret_key
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Pushjet Notification
"""
# Always call throttle before any remote server i/o is made
self.throttle()
server = "https://" if self.secure else "http://"
server += self.host
if self.port:
server += ":" + str(self.port)
try:
api = pushjet.Api(server)
service = api.Service(secret_key=self.secret_key)
service.send(body, title)
self.logger.info('Sent Pushjet notification.')
except (errors.PushjetError, ValueError) as e:
self.logger.warning('Failed to send Pushjet notification.')
self.logger.debug('Pushjet Exception: %s' % str(e))
return False
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
}
default_port = 443 if self.secure else 80
return '{schema}://{secret_key}@{hostname}{port}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
secret_key=self.quote(self.secret_key, safe=''),
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
Syntax:
pjet://secret_key@hostname
pjet://secret_key@hostname:port
pjets://secret_key@hostname
pjets://secret_key@hostname:port
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
if not results.get('user'):
# a username is required
return None
# Store it as it's value
results['secret_key'] = results.get('user')
return results

View File

@ -23,8 +23,121 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from . import NotifyPushjet
import re
from . import pushjet
__all__ = [
'NotifyPushjet',
]
from ..NotifyBase import NotifyBase
from ...common import NotifyType
PUBLIC_KEY_RE = re.compile(
r'^[a-z0-9]{4}-[a-z0-9]{6}-[a-z0-9]{12}-[a-z0-9]{5}-[a-z0-9]{9}$', re.I)
SECRET_KEY_RE = re.compile(r'^[a-z0-9]{32}$', re.I)
class NotifyPushjet(NotifyBase):
"""
A wrapper for Pushjet Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Pushjet'
# The default protocol
protocol = 'pjet'
# The default secure protocol
secure_protocol = 'pjets'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_pushjet'
# Disable throttle rate for Pushjet requests since they are normally
# local anyway (the remote/online service is no more)
request_rate_per_sec = 0
def __init__(self, secret_key, **kwargs):
"""
Initialize Pushjet Object
"""
super(NotifyPushjet, self).__init__(**kwargs)
# store our key
self.secret_key = secret_key
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Pushjet Notification
"""
# Always call throttle before any remote server i/o is made
self.throttle()
server = "https://" if self.secure else "http://"
server += self.host
if self.port:
server += ":" + str(self.port)
try:
api = pushjet.pushjet.Api(server)
service = api.Service(secret_key=self.secret_key)
service.send(body, title)
self.logger.info('Sent Pushjet notification.')
except (pushjet.errors.PushjetError, ValueError) as e:
self.logger.warning('Failed to send Pushjet notification.')
self.logger.debug('Pushjet Exception: %s' % str(e))
return False
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
}
default_port = 443 if self.secure else 80
return '{schema}://{secret_key}@{hostname}{port}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
secret_key=self.quote(self.secret_key, safe=''),
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
Syntax:
pjet://secret_key@hostname
pjet://secret_key@hostname:port
pjets://secret_key@hostname
pjets://secret_key@hostname:port
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
if not results.get('user'):
# a username is required
return None
# Store it as it's value
results['secret_key'] = results.get('user')
return results

View File

@ -1,175 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from . import tweepy
from ..NotifyBase import NotifyBase
from ...common import NotifyType
class NotifyTwitter(NotifyBase):
"""
A wrapper to Twitter Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Twitter'
# The services URL
service_url = 'https://twitter.com/'
# The default secure protocol
secure_protocol = 'tweet'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_twitter'
# The maximum allowable characters allowed in the body per message
# This is used during a Private DM Message Size (not Public Tweets
# which are limited to 240 characters)
body_maxlen = 4096
# Twitter does have titles when creating a message
title_maxlen = 0
def __init__(self, ckey, csecret, akey, asecret, **kwargs):
"""
Initialize Twitter Object
"""
super(NotifyTwitter, self).__init__(**kwargs)
if not ckey:
raise TypeError(
'An invalid Consumer API Key was specified.'
)
if not csecret:
raise TypeError(
'An invalid Consumer Secret API Key was specified.'
)
if not akey:
raise TypeError(
'An invalid Acess Token API Key was specified.'
)
if not asecret:
raise TypeError(
'An invalid Acess Token Secret API Key was specified.'
)
if not self.user:
raise TypeError(
'No user was specified.'
)
# Store our data
self.ckey = ckey
self.csecret = csecret
self.akey = akey
self.asecret = asecret
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Twitter Notification
"""
try:
# Attempt to Establish a connection to Twitter
self.auth = tweepy.OAuthHandler(self.ckey, self.csecret)
# Apply our Access Tokens
self.auth.set_access_token(self.akey, self.asecret)
except Exception:
self.logger.warning(
'Twitter authentication failed; '
'please verify your configuration.'
)
return False
# Always call throttle before any remote server i/o is made to avoid
# thrashing the remote server and risk being blocked.
self.throttle()
try:
# Get our API
api = tweepy.API(self.auth)
# Send our Direct Message
api.send_direct_message(self.user, text=body)
self.logger.info('Sent Twitter DM notification.')
except Exception as e:
self.logger.warning(
'A Connection error occured sending Twitter '
'direct message to %s.' % self.user)
self.logger.debug('Twitter Exception: %s' % str(e))
# Return; we're done
return False
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
# The first token is stored in the hostname
consumer_key = results['host']
# Now fetch the remaining tokens
try:
consumer_secret, access_token_key, access_token_secret = \
[x for x in filter(bool, NotifyBase.split_path(
results['fullpath']))][0:3]
except (ValueError, AttributeError, IndexError):
# Force some bad values that will get caught
# in parsing later
consumer_secret = None
access_token_key = None
access_token_secret = None
results['ckey'] = consumer_key
results['csecret'] = consumer_secret
results['akey'] = access_token_key
results['asecret'] = access_token_secret
return results

View File

@ -23,8 +23,153 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from . import NotifyTwitter
from . import tweepy
from ..NotifyBase import NotifyBase
from ...common import NotifyType
__all__ = [
'NotifyTwitter',
]
class NotifyTwitter(NotifyBase):
"""
A wrapper to Twitter Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Twitter'
# The services URL
service_url = 'https://twitter.com/'
# The default secure protocol
secure_protocol = 'tweet'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_twitter'
# The maximum allowable characters allowed in the body per message
# This is used during a Private DM Message Size (not Public Tweets
# which are limited to 240 characters)
body_maxlen = 4096
# Twitter does have titles when creating a message
title_maxlen = 0
def __init__(self, ckey, csecret, akey, asecret, **kwargs):
"""
Initialize Twitter Object
"""
super(NotifyTwitter, self).__init__(**kwargs)
if not ckey:
raise TypeError(
'An invalid Consumer API Key was specified.'
)
if not csecret:
raise TypeError(
'An invalid Consumer Secret API Key was specified.'
)
if not akey:
raise TypeError(
'An invalid Acess Token API Key was specified.'
)
if not asecret:
raise TypeError(
'An invalid Acess Token Secret API Key was specified.'
)
if not self.user:
raise TypeError(
'No user was specified.'
)
# Store our data
self.ckey = ckey
self.csecret = csecret
self.akey = akey
self.asecret = asecret
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Twitter Notification
"""
try:
# Attempt to Establish a connection to Twitter
self.auth = tweepy.OAuthHandler(self.ckey, self.csecret)
# Apply our Access Tokens
self.auth.set_access_token(self.akey, self.asecret)
except Exception:
self.logger.warning(
'Twitter authentication failed; '
'please verify your configuration.'
)
return False
# Always call throttle before any remote server i/o is made to avoid
# thrashing the remote server and risk being blocked.
self.throttle()
try:
# Get our API
api = tweepy.API(self.auth)
# Send our Direct Message
api.send_direct_message(self.user, text=body)
self.logger.info('Sent Twitter DM notification.')
except Exception as e:
self.logger.warning(
'A Connection error occured sending Twitter '
'direct message to %s.' % self.user)
self.logger.debug('Twitter Exception: %s' % str(e))
# Return; we're done
return False
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
# The first token is stored in the hostname
consumer_key = results['host']
# Now fetch the remaining tokens
try:
consumer_secret, access_token_key, access_token_secret = \
[x for x in filter(bool, NotifyBase.split_path(
results['fullpath']))][0:3]
except (ValueError, AttributeError, IndexError):
# Force some bad values that will get caught
# in parsing later
consumer_secret = None
access_token_key = None
access_token_secret = None
results['ckey'] = consumer_key
results['csecret'] = consumer_secret
results['akey'] = access_token_key
results['asecret'] = access_token_secret
return results

View File

@ -23,44 +23,24 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import sys
import six
import re
from os import listdir
from os.path import dirname
from os.path import abspath
# Used for testing
from . import NotifyEmail as NotifyEmailBase
from .NotifyBoxcar import NotifyBoxcar
from .NotifyDBus import NotifyDBus
from .NotifyDiscord import NotifyDiscord
from .NotifyEmail import NotifyEmail
from .NotifyEmby import NotifyEmby
from .NotifyFaast import NotifyFaast
from .NotifyGotify import NotifyGotify
from .NotifyGrowl.NotifyGrowl import NotifyGrowl
from .NotifyGnome import NotifyGnome
from .NotifyIFTTT import NotifyIFTTT
from .NotifyJoin import NotifyJoin
from .NotifyJSON import NotifyJSON
from .NotifyMatrix import NotifyMatrix
from .NotifyMatterMost import NotifyMatterMost
from .NotifyProwl import NotifyProwl
from .NotifyPushed import NotifyPushed
from .NotifyPushBullet import NotifyPushBullet
from .NotifyPushjet.NotifyPushjet import NotifyPushjet
from .NotifyPushover import NotifyPushover
from .NotifyRocketChat import NotifyRocketChat
from .NotifyRyver import NotifyRyver
from .NotifySlack import NotifySlack
from .NotifySNS import NotifySNS
from .NotifyTelegram import NotifyTelegram
from .NotifyTwitter.NotifyTwitter import NotifyTwitter
from .NotifyXBMC import NotifyXBMC
from .NotifyXML import NotifyXML
from .NotifyWindows import NotifyWindows
# Required until re-factored into base code
from .NotifyPushjet import pushjet
from .NotifyGrowl import gntp
from .NotifyTwitter import tweepy
# NotifyBase object is passed in as a module not class
from . import NotifyBase
from ..common import NotifyImageSize
from ..common import NOTIFY_IMAGE_SIZES
from ..common import NotifyType
@ -69,21 +49,12 @@ from ..common import NOTIFY_TYPES
# Maintains a mapping of all of the Notification services
SCHEMA_MAP = {}
__all__ = [
# Notification Services
'NotifyBoxcar', 'NotifyDBus', 'NotifyEmail', 'NotifyEmby', 'NotifyDiscord',
'NotifyFaast', 'NotifyGnome', 'NotifyGotify', 'NotifyGrowl', 'NotifyIFTTT',
'NotifyJoin', 'NotifyJSON', 'NotifyMatrix', 'NotifyMatterMost',
'NotifyProwl', 'NotifyPushed', 'NotifyPushBullet', 'NotifyPushjet',
'NotifyPushover', 'NotifyRocketChat', 'NotifyRyver', 'NotifySlack',
'NotifySNS', 'NotifyTwitter', 'NotifyTelegram', 'NotifyXBMC',
'NotifyXML', 'NotifyWindows',
# Reference
'NotifyImageSize', 'NOTIFY_IMAGE_SIZES', 'NotifyType', 'NOTIFY_TYPES',
'NotifyBase',
# NotifyEmail Base References (used for Testing)
# NotifyEmail Base Module (used for NotifyEmail testing)
'NotifyEmailBase',
# gntp (used for NotifyGrowl Testing)
@ -98,24 +69,57 @@ __all__ = [
# Load our Lookup Matrix
def __load_matrix():
def __load_matrix(path=abspath(dirname(__file__)), name='apprise.plugins'):
"""
Dynamically load our schema map; this allows us to gracefully
skip over modules we simply don't have the dependencies for.
"""
# Used for the detection of additional Notify Services objects
# The .py extension is optional as we support loading directories too
module_re = re.compile(r'^(?P<name>Notify[a-z0-9]+)(\.py)?$', re.I)
thismodule = sys.modules[__name__]
for f in listdir(path):
match = module_re.match(f)
if not match:
# keep going
continue
# to add it's mapping to our hash table
for entry in dir(thismodule):
# Store our notification/plugin name:
plugin_name = match.group('name')
try:
module = __import__(
'{}.{}'.format(name, plugin_name),
globals(), locals(),
fromlist=[plugin_name])
except ImportError:
# No problem, we can't use this object
continue
if not hasattr(module, plugin_name):
# Not a library we can load as it doesn't follow the simple rule
# that the class must bear the same name as the notification
# file itself.
continue
# Get our plugin
plugin = getattr(thismodule, entry)
if not hasattr(plugin, 'app_id'): # pragma: no branch
plugin = getattr(module, plugin_name)
if not hasattr(plugin, 'app_id'):
# Filter out non-notification modules
continue
elif plugin_name in __all__:
# we're already handling this object
continue
# Add our module name to our __all__
__all__.append(plugin_name)
# Ensure we provide the class as the reference to this directory and
# not the module:
globals()[plugin_name] = plugin
# Load protocol(s) if defined
proto = getattr(plugin, 'protocol', None)
if isinstance(proto, six.string_types):
@ -140,6 +144,8 @@ def __load_matrix():
if p not in SCHEMA_MAP:
SCHEMA_MAP[p] = plugin
return SCHEMA_MAP
# Dynamically build our module
# Dynamically build our schema base
__load_matrix()

View File

@ -24,6 +24,7 @@
# THE SOFTWARE.
from __future__ import print_function
import sys
import six
import pytest
import requests
@ -670,3 +671,68 @@ def test_apprise_details():
# a list of entrys that do not have a string defined.
assert(not len([x['service_name'] for x in details['schemas']
if not isinstance(x['service_name'], six.string_types)]))
def test_notify_matrix_dynamic_importing(tmpdir):
"""
API: Apprise() Notify Matrix Importing
"""
# Make our new path valid
suite = tmpdir.mkdir("apprise_notify_test_suite")
suite.join("__init__.py").write('')
module_name = 'badnotify'
# Update our path to point to our new test suite
sys.path.insert(0, str(suite))
# Create a base area to work within
base = suite.mkdir(module_name)
base.join("__init__.py").write('')
# Test no app_id
base.join('NotifyBadFile1.py').write(
"""
class NotifyBadFile1(object):
pass""")
# No class of the same name
base.join('NotifyBadFile2.py').write(
"""
class BadClassName(object):
pass""")
# Exception thrown
base.join('NotifyBadFile3.py').write("""raise ImportError()""")
# Utilizes a schema:// already occupied (as string)
base.join('NotifyGoober.py').write(
"""
from apprise import NotifyBase
class NotifyGoober(NotifyBase):
# This class tests the fact we have a new class name, but we're
# trying to over-ride items previously used
# The default simple (insecure) protocol (used by NotifyMail)
protocol = 'mailto'
# The default secure protocol (used by NotifyMail)
secure_protocol = 'mailtos'""")
# Utilizes a schema:// already occupied (as tuple)
base.join('NotifyBugger.py').write("""
from apprise import NotifyBase
class NotifyBugger(NotifyBase):
# This class tests the fact we have a new class name, but we're
# trying to over-ride items previously used
# The default simple (insecure) protocol (used by NotifyMail), the other
# isn't
protocol = ('mailto', 'bugger-test' )
# The default secure protocol (used by NotifyMail), the other isn't
secure_protocol = ('mailtos', 'bugger-tests')""")
__load_matrix(path=str(base), name=module_name)

View File

@ -23,9 +23,11 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import sys
import six
from apprise.AppriseAsset import AppriseAsset
from apprise.config.ConfigBase import ConfigBase
from apprise.config import __load_matrix
# Disable logging for a cleaner testing output
import logging
@ -47,7 +49,7 @@ def test_config_base():
except TypeError:
assert(True)
# Notify format types are not the same as ConfigBase ones
# Config format types are not the same as ConfigBase ones
try:
ConfigBase(**{'format': 'markdown'})
# We should never reach here as an exception should be thrown
@ -591,3 +593,68 @@ urls:
assert len(result[5].tags) == 4
assert 'customer' in result[5].tags
assert 'chris' in result[5].tags
def test_config_matrix_dynamic_importing(tmpdir):
"""
API: Apprise() Config Matrix Importing
"""
# Make our new path valid
suite = tmpdir.mkdir("apprise_config_test_suite")
suite.join("__init__.py").write('')
module_name = 'badconfig'
# Update our path to point to our new test suite
sys.path.insert(0, str(suite))
# Create a base area to work within
base = suite.mkdir(module_name)
base.join("__init__.py").write('')
# Test no app_id
base.join('ConfigBadFile1.py').write(
"""
class ConfigBadFile1(object):
pass""")
# No class of the same name
base.join('ConfigBadFile2.py').write(
"""
class BadClassName(object):
pass""")
# Exception thrown
base.join('ConfigBadFile3.py').write("""raise ImportError()""")
# Utilizes a schema:// already occupied (as string)
base.join('ConfigGoober.py').write(
"""
from apprise import ConfigBase
class ConfigGoober(ConfigBase):
# This class tests the fact we have a new class name, but we're
# trying to over-ride items previously used
# The default simple (insecure) protocol (used by ConfigMail)
protocol = 'http'
# The default secure protocol (used by ConfigMail)
secure_protocol = 'https'""")
# Utilizes a schema:// already occupied (as tuple)
base.join('ConfigBugger.py').write("""
from apprise import ConfigBase
class ConfigBugger(ConfigBase):
# This class tests the fact we have a new class name, but we're
# trying to over-ride items previously used
# The default simple (insecure) protocol (used by ConfigMail), the other
# isn't
protocol = ('http', 'bugger-test' )
# The default secure protocol (used by ConfigMail), the other isn't
secure_protocol = ('https', 'bugger-tests')""")
__load_matrix(path=str(base), name=module_name)

View File

@ -112,8 +112,11 @@ def test_plugin(mock_refresh, mock_send):
obj = Apprise.instantiate(url, suppress_exceptions=False)
if obj is None:
# We're done (assuming this is what we were expecting)
assert instance is None
if instance is not None:
# We're done (assuming this is what we were expecting)
print("{} didn't instantiate itself "
"(we expected it to)".format(url))
assert False
continue
if instance is None:

View File

@ -1610,8 +1610,11 @@ def test_rest_plugins(mock_post, mock_get):
url, asset=asset, suppress_exceptions=False)
if obj is None:
# We're done (assuming this is what we were expecting)
assert instance is None
if instance is not None:
# We're done (assuming this is what we were expecting)
print("{} didn't instantiate itself "
"(we expected it to)".format(url))
assert False
continue
if instance is None: