Merge pull request #74 from caronc/api-improvements

API Improvements, Overflow support, Throttling Refactored, IFTTT Refactored, and +/- Params Support
This commit is contained in:
Chris Caron 2019-02-20 12:41:03 -05:00 committed by GitHub
commit c6cf06e6f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 2632 additions and 600 deletions

View File

@ -26,7 +26,7 @@ matrix:
install:
- pip install .
- pip install codecov tox
- pip install codecov
- pip install -r dev-requirements.txt
- pip install -r requirements.txt
- if [[ $TRAVIS_PYTHON_VERSION != 'pypy'* ]]; then travis_retry pip install dbus-python; fi

View File

@ -39,7 +39,7 @@ The table below identifies the services this tool supports and some example serv
| [Faast](https://github.com/caronc/apprise/wiki/Notify_faast) | faast:// | (TCP) 443 | faast://authorizationtoken
| [Gnome](https://github.com/caronc/apprise/wiki/Notify_gnome) | gnome:// | n/a | gnome://
| [Growl](https://github.com/caronc/apprise/wiki/Notify_growl) | growl:// | (UDP) 23053 | growl://hostname<br />growl://hostname:portno<br />growl://password@hostname<br />growl://password@hostname:port</br>**Note**: you can also use the get parameter _version_ which can allow the growl request to behave using the older v1.x protocol. An example would look like: growl://hostname?version=1
| [IFTTT](https://github.com/caronc/apprise/wiki/Notify_ifttt) | ifttt:// | (TCP) 443 | ifttt://webhooksID/EventToTrigger<br />ifttt://webhooksID/EventToTrigger/Value1/Value2/Value3<br />ifttt://webhooksID/EventToTrigger/?Value3=NewEntry&Value2=AnotherValue
| [IFTTT](https://github.com/caronc/apprise/wiki/Notify_ifttt) | ifttt:// | (TCP) 443 | ifttt://webhooksID/Event<br />ifttt://webhooksID/Event1/Event2/EventN<br/>ifttt://webhooksID/Event1/?+Key=Value<br/>ifttt://webhooksID/Event1/?-Key=value1
| [Join](https://github.com/caronc/apprise/wiki/Notify_join) | join:// | (TCP) 443 | join://apikey/device<br />join://apikey/device1/device2/deviceN/<br />join://apikey/group<br />join://apikey/groupA/groupB/groupN<br />join://apikey/DeviceA/groupA/groupN/DeviceN/
| [KODI](https://github.com/caronc/apprise/wiki/Notify_kodi) | kodi:// or kodis:// | (TCP) 8080 or 443 | kodi://hostname<br />kodi://user@hostname<br />kodi://user:password@hostname:port
| [Matrix](https://github.com/caronc/apprise/wiki/Notify_matrix) | matrix:// or matrixs:// | (TCP) 80 or 443 | matrix://token<br />matrix://user@token<br />matrixs://token?mode=slack<br />matrixs://user@token
@ -110,8 +110,8 @@ apobj.add('pbul://o.gn5kj6nfhv736I7jC3cj3QLRiyhgl98b')
# Then notify these services any time you desire. The below would
# notify all of the services loaded into our Apprise object.
apobj.notify(
title='my notification title',
body='what a great notification service!',
title='my notification title',
)
```

View File

@ -171,7 +171,7 @@ class Apprise(object):
# URL information
plugin = SCHEMA_MAP[results['schema']](**results)
except:
except Exception:
# the arguments are invalid or can not be used.
logger.error('Could not load URL: %s' % url)
return None
@ -238,7 +238,7 @@ class Apprise(object):
"""
self.servers[:] = []
def notify(self, title, body, notify_type=NotifyType.INFO,
def notify(self, body, title='', notify_type=NotifyType.INFO,
body_format=None, tag=None):
"""
Send a notification to all of the plugins previously loaded.
@ -366,8 +366,8 @@ class Apprise(object):
try:
# Send notification
if not server.notify(
title=title,
body=conversion_map[server.notify_format],
title=title,
notify_type=notify_type):
# Toggle our return status flag
@ -375,7 +375,6 @@ class Apprise(object):
except TypeError:
# These our our internally thrown notifications
# TODO: Change this to a custom one such as AppriseNotifyError
status = False
except Exception:
@ -432,6 +431,33 @@ class Apprise(object):
return response
def urls(self):
"""
Returns all of the loaded URLs defined in this apprise object.
"""
return [x.url() for x in self.servers]
def pop(self, index):
"""
Removes an indexed Notification Service from the stack and
returns it.
"""
# Remove our entry
return self.servers.pop(index)
def __getitem__(self, index):
"""
Returns the indexed server entry of a loaded notification server
"""
return self.servers[index]
def __iter__(self):
"""
Returns an iterator to our server list
"""
return iter(self.servers)
def __len__(self):
"""
Returns the number of servers loaded

View File

@ -37,6 +37,8 @@ from .common import NotifyImageSize
from .common import NOTIFY_IMAGE_SIZES
from .common import NotifyFormat
from .common import NOTIFY_FORMATS
from .common import OverflowMode
from .common import OVERFLOW_MODES
from .plugins.NotifyBase import NotifyBase
from .Apprise import Apprise
@ -52,6 +54,6 @@ __all__ = [
'Apprise', 'AppriseAsset', 'NotifyBase',
# Reference
'NotifyType', 'NotifyImageSize', 'NotifyFormat', 'NOTIFY_TYPES',
'NOTIFY_IMAGE_SIZES', 'NOTIFY_FORMATS',
'NotifyType', 'NotifyImageSize', 'NotifyFormat', 'OverflowMode',
'NOTIFY_TYPES', 'NOTIFY_IMAGE_SIZES', 'NOTIFY_FORMATS', 'OVERFLOW_MODES',
]

View File

@ -77,3 +77,31 @@ NOTIFY_FORMATS = (
NotifyFormat.HTML,
NotifyFormat.MARKDOWN,
)
class OverflowMode(object):
"""
A list of pre-defined modes of how to handle the text when it exceeds the
defined maximum message size.
"""
# Send the data as is; untouched. Let the upstream server decide how the
# content is handled. Some upstream services might gracefully handle this
# with expected intentions; others might not.
UPSTREAM = 'upstream'
# Always truncate the text when it exceeds the maximum message size and
# send it anyway
TRUNCATE = 'truncate'
# Split the message into multiple smaller messages that fit within the
# limits of what is expected. The smaller messages are sent
SPLIT = 'split'
# Define our modes so we can verify if we need to
OVERFLOW_MODES = (
OverflowMode.UPSTREAM,
OverflowMode.TRUNCATE,
OverflowMode.SPLIT,
)

View File

@ -26,6 +26,8 @@
import re
import logging
from time import sleep
from datetime import datetime
try:
# Python 2.7
from urllib import unquote as _unquote
@ -42,9 +44,12 @@ from ..utils import parse_url
from ..utils import parse_bool
from ..utils import parse_list
from ..utils import is_hostname
from ..common import NotifyType
from ..common import NOTIFY_TYPES
from ..common import NotifyFormat
from ..common import NOTIFY_FORMATS
from ..common import OverflowMode
from ..common import OVERFLOW_MODES
from ..AppriseAsset import AppriseAsset
@ -52,13 +57,6 @@ from ..AppriseAsset import AppriseAsset
from xml.sax.saxutils import escape as sax_escape
def _escape(text):
"""
saxutil escape tool
"""
return sax_escape(text, {"'": "&apos;", "\"": "&quot;"})
HTTP_ERROR_MAP = {
400: 'Bad Request - Unsupported Parameters.',
401: 'Verification Failed.',
@ -113,21 +111,33 @@ class NotifyBase(object):
setup_url = None
# Most Servers do not like more then 1 request per 5 seconds, so 5.5 gives
# us a safe play range...
throttle_attempt = 5.5
# us a safe play range.
request_rate_per_sec = 5.5
# Allows the user to specify the NotifyImageSize object
image_size = None
# The maximum allowable characters allowed in the body per message
body_maxlen = 32768
# We set it to what would virtually be an infinite value really
# 2^63 - 1 = 9223372036854775807
body_maxlen = 9223372036854775807
# Defines the maximum allowable characters in the title
# Defines the maximum allowable characters in the title; set this to zero
# if a title can't be used. Titles that are not used but are defined are
# automatically placed into the body
title_maxlen = 250
# Set the maximum line count; if this is set to anything larger then zero
# the message (prior to it being sent) will be truncated to this number
# of lines. Setting this to zero disables this feature.
body_max_line_count = 0
# Default Notify Format
notify_format = NotifyFormat.TEXT
# Default Overflow Mode
overflow_mode = OverflowMode.UPSTREAM
# Maintain a set of tags to associate with this specific notification
tags = set()
@ -162,7 +172,6 @@ class NotifyBase(object):
self.user = kwargs.get('user')
self.password = kwargs.get('password')
self.headers = kwargs.get('headers')
if 'format' in kwargs:
# Store the specified format if specified
@ -177,25 +186,64 @@ class NotifyBase(object):
# Provide override
self.notify_format = notify_format
if 'overflow' in kwargs:
# Store the specified format if specified
overflow = kwargs.get('overflow', '')
if overflow.lower() not in OVERFLOW_MODES:
self.logger.error(
'Invalid overflow method %s' % overflow,
)
raise TypeError(
'Invalid overflow method %s' % overflow,
)
# Provide override
self.overflow_mode = overflow
if 'tag' in kwargs:
# We want to associate some tags with our notification service.
# the code below gets the 'tag' argument if defined, otherwise
# it just falls back to whatever was already defined globally
self.tags = set(parse_list(kwargs.get('tag', self.tags)))
def throttle(self, throttle_time=None):
# Tracks the time any i/o was made to the remote server. This value
# is automatically set and controlled through the throttle() call.
self._last_io_datetime = None
def throttle(self, last_io=None):
"""
A common throttle control
"""
self.logger.debug('Throttling...')
throttle_time = throttle_time \
if throttle_time is not None else self.throttle_attempt
if last_io is not None:
# Assume specified last_io
self._last_io_datetime = last_io
# Perform throttle
if throttle_time > 0:
sleep(throttle_time)
# Get ourselves a reference time of 'now'
reference = datetime.now()
if self._last_io_datetime is None:
# Set time to 'now' and no need to throttle
self._last_io_datetime = reference
return
if self.request_rate_per_sec <= 0.0:
# We're done if there is no throttle limit set
return
# If we reach here, we need to do additional logic.
# If the difference between the reference time and 'now' is less than
# the defined request_rate_per_sec then we need to throttle for the
# remaining balance of this time.
elapsed = (reference - self._last_io_datetime).total_seconds()
if elapsed < self.request_rate_per_sec:
self.logger.debug('Throttling for {}s...'.format(
self.request_rate_per_sec - elapsed))
sleep(self.request_rate_per_sec - elapsed)
# Update our timestamp before we leave
self._last_io_datetime = reference
return
def image_url(self, notify_type, logo=False, extension=None):
@ -260,6 +308,117 @@ class NotifyBase(object):
color_type=color_type,
)
def notify(self, body, title=None, notify_type=NotifyType.INFO,
overflow=None, **kwargs):
"""
Performs notification
"""
# Handle situations where the title is None
title = '' if not title else title
# Apply our overflow (if defined)
for chunk in self._apply_overflow(body=body, title=title,
overflow=overflow):
# Send notification
if not self.send(body=chunk['body'], title=chunk['title'],
notify_type=notify_type):
# Toggle our return status flag
return False
return True
def _apply_overflow(self, body, title=None, overflow=None):
"""
Takes the message body and title as input. This function then
applies any defined overflow restrictions associated with the
notification service and may alter the message if/as required.
The function will always return a list object in the following
structure:
[
{
title: 'the title goes here',
body: 'the message body goes here',
},
{
title: 'the title goes here',
body: 'the message body goes here',
},
]
"""
response = list()
# tidy
title = '' if not title else title.strip()
body = '' if not body else body.rstrip()
if overflow is None:
# default
overflow = self.overflow_mode
if self.title_maxlen <= 0:
# Content is appended to body
body = '{}\r\n{}'.format(title, body)
title = ''
# Enforce the line count first always
if self.body_max_line_count > 0:
# Limit results to just the first 2 line otherwise
# there is just to much content to display
body = re.split(r'\r*\n', body)
body = '\r\n'.join(body[0:self.body_max_line_count])
if overflow == OverflowMode.UPSTREAM:
# Nothing more to do
response.append({'body': body, 'title': title})
return response
elif len(title) > self.title_maxlen:
# Truncate our Title
title = title[:self.title_maxlen]
if self.body_maxlen > 0 and len(body) <= self.body_maxlen:
response.append({'body': body, 'title': title})
return response
if overflow == OverflowMode.TRUNCATE:
# Truncate our body and return
response.append({
'body': body[:self.body_maxlen],
'title': title,
})
# For truncate mode, we're done now
return response
# If we reach here, then we are in SPLIT mode.
# For here, we want to split the message as many times as we have to
# in order to fit it within the designated limits.
response = [{
'body': body[i: i + self.body_maxlen],
'title': title} for i in range(0, len(body), self.body_maxlen)]
return response
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Should preform the actual notification itself.
"""
raise NotImplementedError("send() is implimented by the child class.")
def url(self):
"""
Assembles the URL associated with the notification based on the
arguments provied.
"""
raise NotImplementedError("url() is implimented by the child class.")
def __contains__(self, tags):
"""
Returns true if the tag specified is associated with this notification.
@ -290,12 +449,21 @@ class NotifyBase(object):
"""
Takes html text as input and escapes it so that it won't
conflict with any xml/html wrapping characters.
Args:
html (str): The HTML code to escape
convert_new_lines (:obj:`bool`, optional): escape new lines (\n)
whitespace (:obj:`bool`, optional): escape whitespace
Returns:
str: The escaped html
"""
if not html:
# nothing more to do; return object as is
return html
escaped = _escape(html)
# Escape HTML
escaped = sax_escape(html, {"'": "&apos;", "\"": "&quot;"})
if whitespace:
# Tidy up whitespace too
@ -311,8 +479,25 @@ class NotifyBase(object):
@staticmethod
def unquote(content, encoding='utf-8', errors='replace'):
"""
common unquote function
Replace %xx escapes by their single-character equivalent. The optional
encoding and errors parameters specify how to decode percent-encoded
sequences.
Wrapper to Python's unquote while remaining compatible with both
Python 2 & 3 since the reference to this function changed between
versions.
Note: errors set to 'replace' means that invalid sequences are
replaced by a placeholder character.
Args:
content (str): The quoted URI string you wish to unquote
encoding (:obj:`str`, optional): encoding type
errors (:obj:`str`, errors): how to handle invalid character found
in encoded string (defined by encoding)
Returns:
str: The unquoted URI string
"""
if not content:
return ''
@ -327,9 +512,25 @@ class NotifyBase(object):
@staticmethod
def quote(content, safe='/', encoding=None, errors=None):
"""
common quote function
""" Replaces single character non-ascii characters and URI specific
ones by their %xx code.
Wrapper to Python's unquote while remaining compatible with both
Python 2 & 3 since the reference to this function changed between
versions.
Args:
content (str): The URI string you wish to quote
safe (str): non-ascii characters and URI specific ones that you
do not wish to escape (if detected). Setting this
string to an empty one causes everything to be
escaped.
encoding (:obj:`str`, optional): encoding type
errors (:obj:`str`, errors): how to handle invalid character found
in encoded string (defined by encoding)
Returns:
str: The quoted URI string
"""
if not content:
return ''
@ -344,26 +545,60 @@ class NotifyBase(object):
@staticmethod
def urlencode(query, doseq=False, safe='', encoding=None, errors=None):
"""
common urlencode function
"""Convert a mapping object or a sequence of two-element tuples
Wrapper to Python's unquote while remaining compatible with both
Python 2 & 3 since the reference to this function changed between
versions.
The resulting string is a series of key=value pairs separated by '&'
characters, where both key and value are quoted using the quote()
function.
Note: If the dictionary entry contains an entry that is set to None
it is not included in the final result set. If you want to
pass in an empty variable, set it to an empty string.
Args:
query (str): The dictionary to encode
doseq (:obj:`bool`, optional): Handle sequences
safe (:obj:`str`): non-ascii characters and URI specific ones that
you do not wish to escape (if detected). Setting this string
to an empty one causes everything to be escaped.
encoding (:obj:`str`, optional): encoding type
errors (:obj:`str`, errors): how to handle invalid character found
in encoded string (defined by encoding)
Returns:
str: The escaped parameters returned as a string
"""
# Tidy query by eliminating any records set to None
_query = {k: v for (k, v) in query.items() if v is not None}
try:
# Python v3.x
return _urlencode(
query, doseq=doseq, safe=safe, encoding=encoding,
_query, doseq=doseq, safe=safe, encoding=encoding,
errors=errors)
except TypeError:
# Python v2.7
return _urlencode(query)
return _urlencode(_query)
@staticmethod
def split_path(path, unquote=True):
"""
Splits a URL up into a list object.
"""Splits a URL up into a list object.
Parses a specified URL and breaks it into a list.
Args:
path (str): The path to split up into a list.
unquote (:obj:`bool`, optional): call unquote on each element
added to the returned list.
Returns:
list: A list containing all of the elements in the path
"""
if unquote:
return PATHSPLIT_LIST_DELIM.split(
NotifyBase.unquote(path).lstrip('/'))
@ -371,26 +606,51 @@ class NotifyBase(object):
@staticmethod
def is_email(address):
"""
Returns True if specified entry is an email address
"""Determine if the specified entry is an email address
Args:
address (str): The string you want to check.
Returns:
bool: Returns True if the address specified is an email address
and False if it isn't.
"""
return IS_EMAIL_RE.match(address) is not None
@staticmethod
def is_hostname(hostname):
"""
Returns True if specified entry is a hostname
"""Determine if the specified entry is a hostname
Args:
hostname (str): The string you want to check.
Returns:
bool: Returns True if the hostname specified is in fact a hostame
and False if it isn't.
"""
return is_hostname(hostname)
@staticmethod
def parse_url(url, verify_host=True):
"""
Parses the URL and returns it broken apart into a dictionary.
"""Parses the URL and returns it broken apart into a dictionary.
This is very specific and customized for Apprise.
Args:
url (str): The URL you want to fully parse.
verify_host (:obj:`bool`, optional): a flag kept with the parsed
URL which some child classes will later use to verify SSL
keys (if SSL transactions take place). Unless under very
specific circumstances, it is strongly recomended that
you leave this default value set to True.
Returns:
A dictionary is returned containing the URL fully parsed if
successful, otherwise None is returned.
"""
results = parse_url(
url, default_schema='unknown', verify_host=verify_host)
@ -417,6 +677,15 @@ class NotifyBase(object):
results['format']))
del results['format']
# Allow overriding the default overflow
if 'overflow' in results['qsd']:
results['overflow'] = results['qsd'].get('overflow')
if results['overflow'] not in OVERFLOW_MODES:
NotifyBase.logger.warning(
'Unsupported overflow specified {}'.format(
results['overflow']))
del results['overflow']
# Password overrides
if 'pass' in results['qsd']:
results['password'] = results['qsd']['pass']
@ -425,6 +694,4 @@ class NotifyBase(object):
if 'user' in results['qsd']:
results['user'] = results['qsd']['user']
results['headers'] = {k[1:]: v for k, v in results['qsd'].items()
if re.match(r'^-.', k)}
return results

View File

@ -23,12 +23,13 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from json import dumps
import requests
import re
from time import time
import requests
import hmac
from json import dumps
from time import time
from hashlib import sha1
from itertools import chain
try:
from urlparse import urlparse
@ -37,7 +38,7 @@ except ImportError:
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyType
from ..common import NotifyImageSize
from ..utils import compat_is_basestring
@ -168,7 +169,7 @@ class NotifyBoxcar(NotifyBase):
'(%s) specified.' % recipient,
)
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Boxcar Notification
"""
@ -229,6 +230,9 @@ class NotifyBoxcar(NotifyBase):
))
self.logger.debug('Boxcar Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
notify_url,
@ -272,6 +276,27 @@ class NotifyBoxcar(NotifyBase):
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
}
return '{schema}://{access}/{secret}/{recipients}/?{args}'.format(
schema=self.secure_protocol,
access=self.quote(self.access),
secret=self.quote(self.secret),
recipients='/'.join([
self.quote(x) for x in chain(
self.tags, self.device_tokens) if x != DEFAULT_TAG]),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""

View File

@ -26,10 +26,9 @@
from __future__ import absolute_import
from __future__ import print_function
import re
from .NotifyBase import NotifyBase
from ..common import NotifyImageSize
from ..common import NotifyType
from ..utils import GET_SCHEMA_RE
# Default our global support flag
@ -142,12 +141,23 @@ class NotifyDBus(NotifyBase):
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_dbus'
# No throttling required for DBus queries
request_rate_per_sec = 0
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_128
# The number of seconds to keep the message present for
message_timeout_ms = 13000
# Limit results to just the first 10 line otherwise there is just to much
# content to display
body_max_line_count = 10
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.
title_maxlen = 0
# This entry is a bit hacky, but it allows us to unit-test this library
# in an environment that simply doesn't have the gnome packages
# available to us. It also allows us to handle situations where the
@ -190,7 +200,7 @@ class NotifyDBus(NotifyBase):
self.x_axis = x_axis
self.y_axis = y_axis
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform DBus Notification
"""
@ -249,16 +259,10 @@ class NotifyDBus(NotifyBase):
"Could not load Gnome notification icon ({}): {}"
.format(icon_path, e))
# Limit results to just the first 10 line otherwise
# there is just to much content to display
body = re.split('[\r\n]+', body)
if title:
# Place title on first line if it exists
body.insert(0, title)
body = '\r\n'.join(body[0:10])
try:
# Always call throttle() before any remote execution is made
self.throttle()
dbus_iface.Notify(
# Application Identifier
self.app_id,
@ -280,13 +284,20 @@ class NotifyDBus(NotifyBase):
self.logger.info('Sent DBus notification.')
except Exception as e:
except Exception:
self.logger.warning('Failed to send DBus notification.')
self.logger.exception('DBus Exception')
return False
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
return '{schema}://'.format(schema=self.schema)
@staticmethod
def parse_url(url):
"""

View File

@ -48,6 +48,7 @@ from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyImageSize
from ..common import NotifyFormat
from ..common import NotifyType
from ..utils import parse_bool
@ -113,7 +114,7 @@ class NotifyDiscord(NotifyBase):
return
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Discord Notification
"""
@ -180,8 +181,8 @@ class NotifyDiscord(NotifyBase):
else:
# not markdown
payload['content'] = body if not title \
else "{}\r\n{}".format(title, body)
payload['content'] = \
body if not title else "{}\r\n{}".format(title, body)
if self.avatar and image_url:
payload['avatar_url'] = image_url
@ -201,6 +202,10 @@ class NotifyDiscord(NotifyBase):
notify_url, self.verify_certificate,
))
self.logger.debug('Discord Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
notify_url,
@ -241,6 +246,28 @@ class NotifyDiscord(NotifyBase):
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'tts': 'yes' if self.tts else 'no',
'avatar': 'yes' if self.avatar else 'no',
'footer': 'yes' if self.footer else 'no',
'thumbnail': 'yes' if self.thumbnail else 'no',
}
return '{schema}://{webhook_id}/{webhook_token}/?{args}'.format(
schema=self.secure_protocol,
webhook_id=self.quote(self.webhook_id),
webhook_token=self.quote(self.webhook_token),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""

View File

@ -24,15 +24,14 @@
# THE SOFTWARE.
import re
from datetime import datetime
import smtplib
from socket import error as SocketError
from email.mime.text import MIMEText
from socket import error as SocketError
from datetime import datetime
from .NotifyBase import NotifyBase
from ..common import NotifyFormat
from ..common import NotifyType
class WebBaseLogin(object):
@ -344,7 +343,7 @@ class NotifyEmail(NotifyBase):
break
def notify(self, title, body, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Email Notification
"""
@ -375,6 +374,10 @@ class NotifyEmail(NotifyBase):
# bind the socket variable to the current namespace
socket = None
# Always call throttle before any remote server i/o is made
self.throttle()
try:
self.logger.debug('Connecting to remote SMTP server...')
socket_func = smtplib.SMTP
@ -421,6 +424,53 @@ class NotifyEmail(NotifyBase):
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'to': self.to_addr,
'from': self.from_addr,
'name': self.from_name,
'mode': self.secure_mode,
'smtp': self.smtp_host,
'timeout': self.timeout,
'user': self.user,
}
# pull email suffix from username (if present)
user = self.user.split('@')[0]
# Determine Authentication
auth = ''
if self.user and self.password:
auth = '{user}:{password}@'.format(
user=self.quote(user, safe=''),
password=self.quote(self.password, safe=''),
)
else:
# user url
auth = '{user}@'.format(
user=self.quote(user, safe=''),
)
# Default Port setup
default_port = \
self.default_secure_port if self.secure else self.default_port
return '{schema}://{auth}{hostname}{port}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""

View File

@ -37,6 +37,7 @@ from json import loads
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..utils import parse_bool
from ..common import NotifyType
from .. import __version__ as VERSION
@ -445,7 +446,7 @@ class NotifyEmby(NotifyBase):
self.user_id = None
return True
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Emby Notification
"""
@ -494,6 +495,10 @@ class NotifyEmby(NotifyBase):
session_url, self.verify_certificate,
))
self.logger.debug('Emby Payload: %s' % str(payload))
# Always call throttle before the requests are made
self.throttle()
try:
r = requests.post(
session_url,
@ -535,6 +540,39 @@ class NotifyEmby(NotifyBase):
return not has_error
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'modal': 'yes' if self.modal else 'no',
}
# Determine Authentication
auth = ''
if self.user and self.password:
auth = '{user}:{password}@'.format(
user=self.quote(self.user, safe=''),
password=self.quote(self.password, safe=''),
)
else: # self.user is set
auth = '{user}@'.format(
user=self.quote(self.user, safe=''),
)
return '{schema}://{auth}{hostname}{port}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.host,
port='' if self.port is None or self.port == self.emby_default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
@property
def is_authenticated(self):
"""

View File

@ -27,6 +27,7 @@ import requests
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyImageSize
from ..common import NotifyType
class NotifyFaast(NotifyBase):
@ -60,7 +61,7 @@ class NotifyFaast(NotifyBase):
self.authtoken = authtoken
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Faast Notification
"""
@ -85,6 +86,10 @@ class NotifyFaast(NotifyBase):
self.notify_url, self.verify_certificate,
))
self.logger.debug('Faast Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,
@ -124,6 +129,23 @@ class NotifyFaast(NotifyBase):
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
}
return '{schema}://{authtoken}/?{args}'.format(
schema=self.protocol,
authtoken=self.quote(self.authtoken, safe=''),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""

View File

@ -26,10 +26,9 @@
from __future__ import absolute_import
from __future__ import print_function
import re
from .NotifyBase import NotifyBase
from ..common import NotifyImageSize
from ..common import NotifyType
# Default our global support flag
NOTIFY_GNOME_SUPPORT_ENABLED = False
@ -86,6 +85,18 @@ class NotifyGnome(NotifyBase):
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_128
# Disable throttle rate for Gnome requests since they are normally
# local anyway
request_rate_per_sec = 0
# Limit results to just the first 10 line otherwise there is just to much
# content to display
body_max_line_count = 10
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.
title_maxlen = 0
# This entry is a bit hacky, but it allows us to unit-test this library
# in an environment that simply doesn't have the gnome packages
# available to us. It also allows us to handle situations where the
@ -109,7 +120,7 @@ class NotifyGnome(NotifyBase):
else:
self.urgency = urgency
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Gnome Notification
"""
@ -119,15 +130,6 @@ class NotifyGnome(NotifyBase):
"Gnome Notifications are not supported by this system.")
return False
# Limit results to just the first 10 line otherwise
# there is just to much content to display
body = re.split('[\r\n]+', body)
if title:
# Place title on first line if it exists
body.insert(0, title)
body = '\r\n'.join(body[0:10])
try:
# App initialization
Notify.init(self.app_id)
@ -141,6 +143,9 @@ class NotifyGnome(NotifyBase):
# Assign urgency
notification.set_urgency(self.urgency)
# Always call throttle before any remote server i/o is made
self.throttle()
try:
# Use Pixbuf to create the proper image type
image = GdkPixbuf.Pixbuf.new_from_file(icon_path)
@ -157,13 +162,20 @@ class NotifyGnome(NotifyBase):
notification.show()
self.logger.info('Sent Gnome notification.')
except Exception as e:
except Exception:
self.logger.warning('Failed to send Gnome notification.')
self.logger.exception('Gnome Exception')
return False
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
return '{schema}://'.format(schema=self.protocol)
@staticmethod
def parse_url(url):
"""

View File

@ -23,12 +23,11 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import re
from .gntp import notifier
from .gntp import errors
from ..NotifyBase import NotifyBase
from ...common import NotifyImageSize
from ...common import NotifyType
# Priorities
@ -69,12 +68,24 @@ class NotifyGrowl(NotifyBase):
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_growl'
# Default Growl Port
default_port = 23053
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_72
# Disable throttle rate for Growl requests since they are normally
# local anyway
request_rate_per_sec = 0
# A title can not be used for Growl Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.
title_maxlen = 0
# Limit results to just the first 10 line otherwise there is just to much
# content to display
body_max_line_count = 2
# Default Growl Port
default_port = 23053
def __init__(self, priority=None, version=2, **kwargs):
"""
Initialize Growl Object
@ -143,17 +154,11 @@ class NotifyGrowl(NotifyBase):
return
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Growl Notification
"""
# Limit results to just the first 2 line otherwise there is just to
# much content to display
body = re.split('[\r\n]+', body)
body[0] = body[0].strip('#').strip()
body = '\r\n'.join(body[0:2])
icon = None
if self.version >= 2:
# URL Based
@ -178,6 +183,9 @@ class NotifyGrowl(NotifyBase):
# print the binary contents of an image
payload['icon'] = icon
# Always call throttle before any remote server i/o is made
self.throttle()
try:
response = self.growl.notify(**payload)
if not isinstance(response, bool):
@ -207,6 +215,44 @@ class NotifyGrowl(NotifyBase):
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
_map = {
GrowlPriority.LOW: 'low',
GrowlPriority.MODERATE: 'moderate',
GrowlPriority.NORMAL: 'normal',
GrowlPriority.HIGH: 'high',
GrowlPriority.EMERGENCY: 'emergency',
}
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'priority':
_map[GrowlPriority.NORMAL] if self.priority not in _map
else _map[self.priority],
'version': self.version,
}
auth = ''
if self.password:
auth = '{password}@'.format(
password=self.quote(self.user, safe=''),
)
return '{schema}://{auth}{hostname}{port}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.host,
port='' if self.port is None or self.port == self.default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""
@ -239,15 +285,10 @@ class NotifyGrowl(NotifyBase):
if 'priority' in results['qsd'] and len(results['qsd']['priority']):
_map = {
'l': GrowlPriority.LOW,
'-2': GrowlPriority.LOW,
'm': GrowlPriority.MODERATE,
'-1': GrowlPriority.MODERATE,
'n': GrowlPriority.NORMAL,
'0': GrowlPriority.NORMAL,
'h': GrowlPriority.HIGH,
'1': GrowlPriority.HIGH,
'e': GrowlPriority.EMERGENCY,
'2': GrowlPriority.EMERGENCY,
}
try:
results['priority'] = \

View File

@ -34,16 +34,18 @@
# URL. For example, it might look like this:
# https://maker.ifttt.com/use/a3nHB7gA9TfBQSqJAHklod
#
# In the above example a3nHB7gA9TfBQSqJAHklod becomes your {apikey}
# In the above example a3nHB7gA9TfBQSqJAHklod becomes your {webhook_id}
# You will need this to make this notification work correctly
#
# For each event you create you will assign it a name (this will be known as
# the {event} when building your URL.
import requests
from json import dumps
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyType
from ..utils import parse_list
class NotifyIFTTT(NotifyBase):
@ -59,7 +61,7 @@ class NotifyIFTTT(NotifyBase):
service_url = 'https://ifttt.com/'
# The default protocol
protocol = 'ifttt'
secure_protocol = 'ifttt'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_ifttt'
@ -87,37 +89,55 @@ class NotifyIFTTT(NotifyBase):
ifttt_default_type_key = 'value3'
# IFTTT uses the http protocol with JSON requests
notify_url = 'https://maker.ifttt.com/trigger/{event}/with/key/{apikey}'
notify_url = 'https://maker.ifttt.com/' \
'trigger/{event}/with/key/{webhook_id}'
def __init__(self, apikey, event, event_args=None, **kwargs):
def __init__(self, webhook_id, events, add_tokens=None, del_tokens=None,
**kwargs):
"""
Initialize IFTTT Object
add_tokens can optionally be a dictionary of key/value pairs
that you want to include in the IFTTT post to the server.
del_tokens can optionally be a list/tuple/set of tokens
that you want to eliminate from the IFTTT post. There isn't
much real functionality to this one unless you want to remove
reference to Value1, Value2, and/or Value3
"""
super(NotifyIFTTT, self).__init__(**kwargs)
if not apikey:
raise TypeError('You must specify the Webhooks apikey.')
if not webhook_id:
raise TypeError('You must specify the Webhooks webhook_id.')
if not event:
raise TypeError('You must specify the Event you wish to trigger.')
# Store our Events we wish to trigger
self.events = parse_list(events)
if not self.events:
raise TypeError(
'You must specify at least one event you wish to trigger on.')
# Store our APIKey
self.apikey = apikey
self.webhook_id = webhook_id
# Store our Event we wish to trigger
self.event = event
# Tokens to include in post
self.add_tokens = {}
if add_tokens:
self.add_tokens.update(add_tokens)
if isinstance(event_args, dict):
# Make a copy of the arguments so that they can't change
# outside of this plugin
self.event_args = event_args.copy()
# Tokens to remove
self.del_tokens = []
if del_tokens is not None:
if isinstance(del_tokens, (list, tuple, set)):
self.del_tokens = del_tokens
else:
# Force a dictionary
self.event_args = dict()
else:
raise TypeError(
'del_token must be a list; {} was provided'.format(
str(type(del_tokens))))
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform IFTTT Notification
"""
@ -134,72 +154,106 @@ class NotifyIFTTT(NotifyBase):
self.ifttt_default_type_key: notify_type,
}
# Update our payload using any other event_args specified
payload.update(self.event_args)
# Add any new tokens expected (this can also potentially override
# any entries defined above)
payload.update(self.add_tokens)
# Eliminate empty fields; users wishing to cancel the use of the
# self.ifttt_default_ entries can preset these keys to being
# empty so that they get caught here and removed.
payload = {x: y for x, y in payload.items() if y}
# Eliminate fields flagged for removal
payload = {x: y for x, y in payload.items()
if x not in self.del_tokens}
# URL to transmit content via
url = self.notify_url.format(
apikey=self.apikey,
event=self.event,
# Track our failures
error_count = 0
# Create a copy of our event lit
events = list(self.events)
while len(events):
# Retrive an entry off of our event list
event = events.pop(0)
# URL to transmit content via
url = self.notify_url.format(
webhook_id=self.webhook_id,
event=event,
)
self.logger.debug('IFTTT POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
))
self.logger.debug('IFTTT Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,
data=dumps(payload),
headers=headers,
verify=self.verify_certificate,
)
self.logger.debug(
u"IFTTT HTTP response headers: %r" % r.headers)
self.logger.debug(
u"IFTTT HTTP response body: %r" % r.content)
if r.status_code != requests.codes.ok:
# We had a problem
try:
self.logger.warning(
'Failed to send IFTTT:%s '
'notification: %s (error=%s).' % (
event,
HTTP_ERROR_MAP[r.status_code],
r.status_code))
except KeyError:
self.logger.warning(
'Failed to send IFTTT:%s '
'notification (error=%s).' % (
event, r.status_code))
# self.logger.debug('Response Details: %s' % r.content)
error_count += 1
else:
self.logger.info(
'Sent IFTTT notification to Event %s.' % event)
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending IFTTT:%s ' % (
event) + 'notification.'
)
self.logger.debug('Socket Exception: %s' % str(e))
error_count += 1
return (error_count == 0)
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
}
# Store any new key/value pairs added to our list
args.update({'+{}'.format(k): v for k, v in self.add_tokens})
args.update({'-{}'.format(k): '' for k in self.del_tokens})
return '{schema}://{webhook_id}@{events}/?{args}'.format(
schema=self.secure_protocol,
webhook_id=self.webhook_id,
events='/'.join([self.quote(x, safe='') for x in self.events]),
args=self.urlencode(args),
)
self.logger.debug('IFTTT POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
))
self.logger.debug('IFTTT Payload: %s' % str(payload))
try:
r = requests.post(
url,
data=dumps(payload),
headers=headers,
verify=self.verify_certificate,
)
self.logger.debug(
u"IFTTT HTTP response status: %r" % r.status_code)
self.logger.debug(
u"IFTTT HTTP response headers: %r" % r.headers)
self.logger.debug(
u"IFTTT HTTP response body: %r" % r.content)
if r.status_code != requests.codes.ok:
# We had a problem
try:
self.logger.warning(
'Failed to send IFTTT:%s '
'notification: %s (error=%s).' % (
self.event,
HTTP_ERROR_MAP[r.status_code],
r.status_code))
except KeyError:
self.logger.warning(
'Failed to send IFTTT:%s '
'notification (error=%s).' % (
self.event,
r.status_code))
# self.logger.debug('Response Details: %s' % r.content)
return False
else:
self.logger.info(
'Sent IFTTT notification to Event %s.' % self.event)
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending IFTTT:%s ' % (
self.event) + 'notification.'
)
self.logger.debug('Socket Exception: %s' % str(e))
return False
return True
@staticmethod
def parse_url(url):
"""
@ -214,22 +268,14 @@ class NotifyIFTTT(NotifyBase):
return results
# Our Event
results['event'] = results['host']
results['events'] = list()
results['events'].append(results['host'])
# Our API Key
results['apikey'] = results['user']
results['webhook_id'] = results['user']
# Store ValueX entries based on each entry past the host
results['event_args'] = {
'{0}{1}'.format(NotifyIFTTT.ifttt_default_key_prefix, n + 1):
NotifyBase.unquote(x)
for n, x in enumerate(
NotifyBase.split_path(results['fullpath'])) if x}
# Allow users to set key=val parameters to specify more types
# of payload options
results['event_args'].update(
{k: NotifyBase.unquote(v)
for k, v in results['qsd'].items()})
# Now fetch the remaining tokens
results['events'].extend([x for x in filter(
bool, NotifyBase.split_path(results['fullpath']))][0:])
return results

View File

@ -29,6 +29,7 @@ from json import dumps
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyImageSize
from ..common import NotifyType
from ..utils import compat_is_basestring
@ -52,9 +53,17 @@ class NotifyJSON(NotifyBase):
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_128
def __init__(self, **kwargs):
# Disable throttle rate for JSON requests since they are normally
# local anyway
request_rate_per_sec = 0
def __init__(self, headers, **kwargs):
"""
Initialize JSON Object
headers can be a dictionary of key/value pairs that you want to
additionally include as part of the server headers to post with
"""
super(NotifyJSON, self).__init__(**kwargs)
@ -68,9 +77,51 @@ class NotifyJSON(NotifyBase):
if not compat_is_basestring(self.fullpath):
self.fullpath = '/'
self.headers = {}
if headers:
# Store our extra headers
self.headers.update(headers)
return
def notify(self, title, body, notify_type, **kwargs):
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
}
# Append our headers into our args
args.update({'+{}'.format(k): v for k, v in self.headers.items()})
# Determine Authentication
auth = ''
if self.user and self.password:
auth = '{user}:{password}@'.format(
user=self.quote(self.user, safe=''),
password=self.quote(self.password, safe=''),
)
elif self.user:
auth = '{user}@'.format(
user=self.quote(self.user, safe=''),
)
default_port = 443 if self.secure else 80
return '{schema}://{auth}{hostname}{port}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform JSON Notification
"""
@ -91,8 +142,8 @@ class NotifyJSON(NotifyBase):
'Content-Type': 'application/json'
}
if self.headers:
headers.update(self.headers)
# Apply any/all header over-rides defined
headers.update(self.headers)
auth = None
if self.user:
@ -108,6 +159,10 @@ class NotifyJSON(NotifyBase):
url, self.verify_certificate,
))
self.logger.debug('JSON Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,
@ -145,3 +200,23 @@ class NotifyJSON(NotifyBase):
return False
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Add our headers that the user can potentially over-ride if they wish
# to to our returned result set
results['headers'] = results['qsd-']
results['headers'].update(results['qsd+'])
return results

View File

@ -39,6 +39,7 @@ import requests
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyImageSize
from ..common import NotifyType
from ..utils import compat_is_basestring
# Token required as part of the API request
@ -78,7 +79,7 @@ class NotifyJoin(NotifyBase):
service_url = 'https://joaoapps.com/join/'
# The default protocol
protocol = 'join'
secure_protocol = 'join'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_join'
@ -90,6 +91,10 @@ class NotifyJoin(NotifyBase):
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_72
# Limit results to just the first 2 line otherwise there is just to much
# content to display
body_max_line_count = 2
# The maximum allowable characters allowed in the body per message
body_maxlen = 1000
@ -126,22 +131,11 @@ class NotifyJoin(NotifyBase):
# Default to everyone
self.devices.append('group.all')
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Join Notification
"""
try:
# Limit results to just the first 2 line otherwise
# there is just to much content to display
body = re.split('[\r\n]+', body)
body[0] = body[0].strip('#').strip()
body = '\r\n'.join(body[0:2])
except (AttributeError, TypeError):
# body was None or not of a type string
body = ''
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/x-www-form-urlencoded',
@ -188,6 +182,9 @@ class NotifyJoin(NotifyBase):
))
self.logger.debug('Join Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,
@ -227,12 +224,25 @@ class NotifyJoin(NotifyBase):
self.logger.debug('Socket Exception: %s' % str(e))
return_status = False
if len(devices):
# Prevent thrashing requests
self.throttle()
return return_status
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
}
return '{schema}://{apikey}/{devices}/?{args}'.format(
schema=self.secure_protocol,
apikey=self.quote(self.apikey, safe=''),
devices='/'.join([self.quote(x) for x in self.devices]),
args=self.urlencode(args))
@staticmethod
def parse_url(url):
"""

View File

@ -30,6 +30,7 @@ from time import time
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyType
# Token required as part of the API request
VALIDATE_TOKEN = re.compile(r'[A-Za-z0-9]{64}')
@ -112,7 +113,6 @@ class NotifyMatrix(NotifyBase):
if not self.user:
self.logger.warning(
'No user was specified; using %s.' % MATRIX_DEFAULT_USER)
self.user = MATRIX_DEFAULT_USER
if mode not in MATRIX_NOTIFICATION_MODES:
self.logger.warning('The mode specified (%s) is invalid.' % mode)
@ -135,7 +135,7 @@ class NotifyMatrix(NotifyBase):
re.IGNORECASE,
)
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Matrix Notification
"""
@ -170,6 +170,10 @@ class NotifyMatrix(NotifyBase):
url, self.verify_certificate,
))
self.logger.debug('Matrix Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,
@ -209,7 +213,7 @@ class NotifyMatrix(NotifyBase):
def __slack_mode_payload(self, title, body, notify_type):
# prepare JSON Object
payload = {
'username': self.user,
'username': self.user if self.user else MATRIX_DEFAULT_USER,
# Use Markdown language
'mrkdwn': True,
'attachments': [{
@ -230,13 +234,44 @@ class NotifyMatrix(NotifyBase):
msg = '<h4>%s</h4>%s<br/>' % (title, body)
payload = {
'displayName': self.user,
'displayName': self.user if self.user else MATRIX_DEFAULT_USER,
'format': 'html',
'text': msg,
}
return payload
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'mode': self.mode,
}
# Determine Authentication
auth = ''
if self.user:
auth = '{user}@'.format(
user=self.quote(self.user, safe=''),
)
default_port = 443 if self.secure else 80
return '{schema}://{auth}{host}/{token}{port}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
host=self.host,
auth=auth,
token=self.token,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""

View File

@ -30,6 +30,7 @@ from json import dumps
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyImageSize
from ..common import NotifyType
# Some Reference Locations:
# - https://docs.mattermost.com/developer/webhooks-incoming.html
@ -68,6 +69,9 @@ class NotifyMatterMost(NotifyBase):
# The maximum allowable characters allowed in the body per message
body_maxlen = 4000
# Mattermost does not have a title
title_maxlen = 0
def __init__(self, authtoken, channel=None, **kwargs):
"""
Initialize MatterMost Object
@ -108,7 +112,7 @@ class NotifyMatterMost(NotifyBase):
return
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform MatterMost Notification
"""
@ -120,7 +124,7 @@ class NotifyMatterMost(NotifyBase):
# prepare JSON Object
payload = {
'text': '###### %s\n%s' % (title, body),
'text': body,
'icon_url': self.image_url(notify_type),
}
@ -140,6 +144,10 @@ class NotifyMatterMost(NotifyBase):
url, self.verify_certificate,
))
self.logger.debug('MatterMost Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,
@ -179,6 +187,29 @@ class NotifyMatterMost(NotifyBase):
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
}
default_port = 443 if self.secure else self.default_port
default_schema = self.secure_protocol if self.secure else self.protocol
return '{schema}://{hostname}{port}/{authtoken}/?{args}'.format(
schema=default_schema,
hostname=self.host,
port='' if not self.port or self.port == default_port
else ':{}'.format(self.port),
authtoken=self.quote(self.authtoken, safe=''),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""

View File

@ -28,6 +28,7 @@ import requests
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyType
# Used to validate API Key
VALIDATE_APIKEY = re.compile(r'[A-Za-z0-9]{40}')
@ -81,6 +82,10 @@ class NotifyProwl(NotifyBase):
# Prowl uses the http protocol with JSON requests
notify_url = 'https://api.prowlapp.com/publicapi/add'
# Disable throttle rate for Prowl requests since they are normally
# local anyway
request_rate_per_sec = 0
# The maximum allowable characters allowed in the body per message
body_maxlen = 10000
@ -124,7 +129,7 @@ class NotifyProwl(NotifyBase):
# Store the Provider Key
self.providerkey = providerkey
def notify(self, title, body, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Prowl Notification
"""
@ -150,6 +155,10 @@ class NotifyProwl(NotifyBase):
self.notify_url, self.verify_certificate,
))
self.logger.debug('Prowl Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,
@ -190,6 +199,35 @@ class NotifyProwl(NotifyBase):
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
_map = {
ProwlPriority.LOW: 'low',
ProwlPriority.MODERATE: 'moderate',
ProwlPriority.NORMAL: 'normal',
ProwlPriority.HIGH: 'high',
ProwlPriority.EMERGENCY: 'emergency',
}
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'priority': 'normal' if self.priority not in _map
else _map[self.priority]
}
return '{schema}://{apikey}/{providerkey}/?{args}'.format(
schema=self.secure_protocol,
apikey=self.quote(self.apikey, safe=''),
providerkey='' if not self.providerkey
else self.quote(self.providerkey, safe=''),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""
@ -216,15 +254,10 @@ class NotifyProwl(NotifyBase):
if 'priority' in results['qsd'] and len(results['qsd']['priority']):
_map = {
'l': ProwlPriority.LOW,
'-2': ProwlPriority.LOW,
'm': ProwlPriority.MODERATE,
'-1': ProwlPriority.MODERATE,
'n': ProwlPriority.NORMAL,
'0': ProwlPriority.NORMAL,
'h': ProwlPriority.HIGH,
'1': ProwlPriority.HIGH,
'e': ProwlPriority.EMERGENCY,
'2': ProwlPriority.EMERGENCY,
}
try:
results['priority'] = \

View File

@ -30,7 +30,7 @@ from json import dumps
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from .NotifyBase import IS_EMAIL_RE
from ..common import NotifyType
from ..utils import compat_is_basestring
# Flag used as a placeholder to sending to all devices
@ -87,7 +87,7 @@ class NotifyPushBullet(NotifyBase):
if len(self.recipients) == 0:
self.recipients = (PUSHBULLET_SEND_TO_ALL, )
def notify(self, title, body, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform PushBullet Notification
"""
@ -135,6 +135,10 @@ class NotifyPushBullet(NotifyBase):
self.notify_url, self.verify_certificate,
))
self.logger.debug('PushBullet Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,
@ -176,12 +180,31 @@ class NotifyPushBullet(NotifyBase):
self.logger.debug('Socket Exception: %s' % str(e))
has_error = True
if len(recipients):
# Prevent thrashing requests
self.throttle()
return not has_error
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
}
recipients = '/'.join([self.quote(x) for x in self.recipients])
if recipients == PUSHBULLET_SEND_TO_ALL:
# keyword is reserved for internal usage only; it's safe to remove
# it from the recipients list
recipients = ''
return '{schema}://{accesstoken}/{recipients}/?{args}'.format(
schema=self.secure_protocol,
accesstoken=self.quote(self.accesstoken, safe=''),
recipients=recipients,
args=self.urlencode(args))
@staticmethod
def parse_url(url):
"""

View File

@ -26,9 +26,11 @@
import re
import requests
from json import dumps
from itertools import chain
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyType
from ..utils import compat_is_basestring
# Used to detect and parse channels
@ -127,7 +129,7 @@ class NotifyPushed(NotifyBase):
return
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Pushed Notification
"""
@ -152,7 +154,7 @@ class NotifyPushed(NotifyBase):
if len(self.channels) + len(self.users) == 0:
# Just notify the app
return self.send_notification(
return self._send(
payload=payload, notify_type=notify_type, **kwargs)
# If our code reaches here, we want to target channels and users (by
@ -170,16 +172,12 @@ class NotifyPushed(NotifyBase):
# Get Channel
_payload['target_alias'] = channels.pop(0)
if not self.send_notification(
if not self._send(
payload=_payload, notify_type=notify_type, **kwargs):
# toggle flag
has_error = True
if len(channels) + len(users) > 0:
# Prevent thrashing requests
self.throttle()
# Copy our payload
_payload = dict(payload)
_payload['target_type'] = 'pushed_id'
@ -188,23 +186,20 @@ class NotifyPushed(NotifyBase):
while len(users):
# Get User's Pushed ID
_payload['pushed_id'] = users.pop(0)
if not self.send_notification(
if not self._send(
payload=_payload, notify_type=notify_type, **kwargs):
# toggle flag
has_error = True
if len(users) > 0:
# Prevent thrashing requests
self.throttle()
return not has_error
def send_notification(self, payload, notify_type, **kwargs):
def _send(self, payload, notify_type, **kwargs):
"""
A lower level call that directly pushes a payload to the Pushed
Notification servers. This should never be called directly; it is
referenced automatically through the notify() function.
referenced automatically through the send() function.
"""
headers = {
@ -216,6 +211,10 @@ class NotifyPushed(NotifyBase):
self.notify_url, self.verify_certificate,
))
self.logger.debug('Pushed Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,
@ -256,6 +255,30 @@ class NotifyPushed(NotifyBase):
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
}
return '{schema}://{app_key}/{app_secret}/{targets}/?{args}'.format(
schema=self.secure_protocol,
app_key=self.quote(self.app_key, safe=''),
app_secret=self.quote(self.app_secret, safe=''),
targets='/'.join(
[self.quote(x) for x in chain(
# Channels are prefixed with a pound/hashtag symbol
['#{}'.format(x) for x in self.channels],
# Users are prefixed with an @ symbol
['@{}'.format(x) for x in self.users],
)]),
args=self.urlencode(args))
@staticmethod
def parse_url(url):
"""

View File

@ -28,6 +28,7 @@ from .pushjet import errors
from .pushjet import pushjet
from ..NotifyBase import NotifyBase
from ...common import NotifyType
PUBLIC_KEY_RE = re.compile(
r'^[a-z0-9]{4}-[a-z0-9]{6}-[a-z0-9]{12}-[a-z0-9]{5}-[a-z0-9]{9}$', re.I)
@ -52,27 +53,35 @@ class NotifyPushjet(NotifyBase):
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_pushjet'
def __init__(self, **kwargs):
# Disable throttle rate for Pushjet requests since they are normally
# local anyway (the remote/online service is no more)
request_rate_per_sec = 0
def __init__(self, secret_key, **kwargs):
"""
Initialize Pushjet Object
"""
super(NotifyPushjet, self).__init__(**kwargs)
def notify(self, title, body, notify_type):
# store our key
self.secret_key = secret_key
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Pushjet Notification
"""
# Always call throttle before any remote server i/o is made
self.throttle()
server = "https://" if self.secure else "http://"
server += self.host
if self.port:
server += ":" + str(self.port)
try:
server = "http://"
if self.secure:
server = "https://"
server += self.host
if self.port:
server += ":" + str(self.port)
api = pushjet.Api(server)
service = api.Service(secret_key=self.user)
service = api.Service(secret_key=self.secret_key)
service.send(body, title)
self.logger.info('Sent Pushjet notification.')
@ -84,6 +93,28 @@ class NotifyPushjet(NotifyBase):
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
}
default_port = 443 if self.secure else 80
return '{schema}://{secret_key}@{hostname}{port}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
secret_key=self.quote(self.secret_key, safe=''),
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""
@ -91,10 +122,10 @@ class NotifyPushjet(NotifyBase):
us to substantiate this object.
Syntax:
pjet://secret@hostname
pjet://secret@hostname:port
pjets://secret@hostname
pjets://secret@hostname:port
pjet://secret_key@hostname
pjet://secret_key@hostname:port
pjets://secret_key@hostname
pjets://secret_key@hostname:port
"""
results = NotifyBase.parse_url(url)
@ -107,4 +138,7 @@ class NotifyPushjet(NotifyBase):
# a username is required
return None
# Store it as it's value
results['secret_key'] = results.get('user')
return results

View File

@ -26,9 +26,10 @@
import re
import requests
from ..utils import compat_is_basestring
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyType
from ..utils import compat_is_basestring
# Flag used as a placeholder to sending to all devices
PUSHOVER_SEND_TO_ALL = 'ALL_DEVICES'
@ -149,7 +150,7 @@ class NotifyPushover(NotifyBase):
'The user/group specified (%s) is invalid.' % self.user,
)
def notify(self, title, body, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Pushover Notification
"""
@ -189,6 +190,10 @@ class NotifyPushover(NotifyBase):
self.notify_url, self.verify_certificate,
))
self.logger.debug('Pushover Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,
@ -231,12 +236,44 @@ class NotifyPushover(NotifyBase):
self.logger.debug('Socket Exception: %s' % str(e))
has_error = True
if len(devices):
# Prevent thrashing requests
self.throttle()
return not has_error
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
_map = {
PushoverPriority.LOW: 'low',
PushoverPriority.MODERATE: 'moderate',
PushoverPriority.NORMAL: 'normal',
PushoverPriority.HIGH: 'high',
PushoverPriority.EMERGENCY: 'emergency',
}
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'priority':
_map[PushoverPriority.NORMAL] if self.priority not in _map
else _map[self.priority],
}
devices = '/'.join([self.quote(x) for x in self.devices])
if devices == PUSHOVER_SEND_TO_ALL:
# keyword is reserved for internal usage only; it's safe to remove
# it from the devices list
devices = ''
return '{schema}://{auth}{token}/{devices}/?{args}'.format(
schema=self.secure_protocol,
auth='' if not self.user
else '{user}@'.format(user=self.quote(self.user, safe='')),
token=self.quote(self.token, safe=''),
devices=devices,
args=self.urlencode(args))
@staticmethod
def parse_url(url):
"""

View File

@ -26,9 +26,11 @@
import re
import requests
from json import loads
from itertools import chain
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyType
from ..utils import compat_is_basestring
IS_CHANNEL = re.compile(r'^#(?P<name>[A-Za-z0-9]+)$')
@ -66,8 +68,11 @@ class NotifyRocketChat(NotifyBase):
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_rocketchat'
# Defines the maximum allowable characters in the title
title_maxlen = 200
# The title is not used
title_maxlen = 0
# The maximum size of the message
body_maxlen = 200
def __init__(self, recipients=None, **kwargs):
"""
@ -106,6 +111,12 @@ class NotifyRocketChat(NotifyBase):
elif not isinstance(recipients, (set, tuple, list)):
recipients = []
if not (self.user and self.password):
# Username & Password is required for Rocket Chat to work
raise TypeError(
'No Rocket.Chat user/pass combo specified.'
)
# Validate recipients and drop bad ones:
for recipient in recipients:
result = IS_CHANNEL.match(recipient)
@ -133,9 +144,44 @@ class NotifyRocketChat(NotifyBase):
# Used to track token headers upon authentication (if successful)
self.headers = {}
def notify(self, title, body, notify_type, **kwargs):
def url(self):
"""
wrapper to send_notification since we can alert more then one channel
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
}
# Determine Authentication
auth = '{user}:{password}@'.format(
user=self.quote(self.user, safe=''),
password=self.quote(self.password, safe=''),
)
default_port = 443 if self.secure else 80
return '{schema}://{auth}{hostname}{port}/{targets}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
targets='/'.join(
[self.quote(x) for x in chain(
# Channels are prefixed with a pound/hashtag symbol
['#{}'.format(x) for x in self.channels],
# Rooms are as is
self.rooms,
)]),
args=self.urlencode(args),
)
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
wrapper to _send since we can alert more then one channel
"""
# Track whether we authenticated okay
@ -143,8 +189,8 @@ class NotifyRocketChat(NotifyBase):
if not self.login():
return False
# Prepare our message
text = '*%s*\r\n%s' % (title.replace('*', '\\*'), body)
# Prepare our message using the body only
text = body
# Initiaize our error tracking
has_error = False
@ -157,7 +203,7 @@ class NotifyRocketChat(NotifyBase):
# Get Channel
channel = channels.pop(0)
if not self.send_notification(
if not self._send(
{
'text': text,
'channel': channel,
@ -166,16 +212,12 @@ class NotifyRocketChat(NotifyBase):
# toggle flag
has_error = True
if len(channels) + len(rooms) > 0:
# Prevent thrashing requests
self.throttle()
# Send all our defined room id's
while len(rooms):
# Get Room
room = rooms.pop(0)
if not self.send_notification(
if not self._send(
{
'text': text,
'roomId': room,
@ -184,16 +226,12 @@ class NotifyRocketChat(NotifyBase):
# toggle flag
has_error = True
if len(rooms) > 0:
# Prevent thrashing requests
self.throttle()
# logout
self.logout()
return not has_error
def send_notification(self, payload, notify_type, **kwargs):
def _send(self, payload, notify_type, **kwargs):
"""
Perform Notify Rocket.Chat Notification
"""
@ -202,6 +240,10 @@ class NotifyRocketChat(NotifyBase):
self.api_url + 'chat.postMessage', self.verify_certificate,
))
self.logger.debug('Rocket.Chat Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.api_url + 'chat.postMessage',

View File

@ -38,6 +38,7 @@ from json import dumps
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyImageSize
from ..common import NotifyType
# Token required as part of the API request
VALIDATE_TOKEN = re.compile(r'[A-Za-z0-9]{15}')
@ -141,7 +142,7 @@ class NotifyRyver(NotifyBase):
re.IGNORECASE,
)
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Ryver Notification
"""
@ -178,6 +179,10 @@ class NotifyRyver(NotifyBase):
url, self.verify_certificate,
))
self.logger.debug('Ryver Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,
@ -221,6 +226,33 @@ class NotifyRyver(NotifyBase):
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'webhook': self.webhook,
}
# Determine if there is a botname present
botname = ''
if self.user:
botname = '{botname}@'.format(
botname=self.quote(self.user, safe=''),
)
return '{schema}://{botname}{organization}/{token}/?{args}'.format(
schema=self.secure_protocol,
botname=botname,
organization=self.quote(self.organization, safe=''),
token=self.quote(self.token, safe=''),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""

View File

@ -24,16 +24,17 @@
# THE SOFTWARE.
import re
import hmac
import requests
from hashlib import sha256
from datetime import datetime
from collections import OrderedDict
from xml.etree import ElementTree
from itertools import chain
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyType
from ..utils import compat_is_basestring
# Some Phone Number Detection
@ -58,8 +59,8 @@ LIST_DELIM = re.compile(r'[ \t\r\n,\\/]+')
# region as a delimiter. This is a bit hacky; but it's much easier than having
# users of this product search though this Access Key Secret and escape all
# of the forward slashes!
IS_REGION = re.compile(r'^\s*(?P<country>[a-z]{2})-'
r'(?P<area>[a-z]+)-(?P<no>[0-9]+)\s*$', re.I)
IS_REGION = re.compile(
r'^\s*(?P<country>[a-z]{2})-(?P<area>[a-z]+)-(?P<no>[0-9]+)\s*$', re.I)
# Extend HTTP Error Messages
AWS_HTTP_ERROR_MAP = HTTP_ERROR_MAP.copy()
@ -85,10 +86,18 @@ class NotifySNS(NotifyBase):
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_sns'
# AWS is pretty good for handling data load so request limits
# can occur in much shorter bursts
request_rate_per_sec = 2.5
# The maximum length of the body
# Source: https://docs.aws.amazon.com/sns/latest/api/API_Publish.html
body_maxlen = 140
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.
title_maxlen = 0
def __init__(self, access_key_id, secret_access_key, region_name,
recipients=None, **kwargs):
"""
@ -185,7 +194,7 @@ class NotifySNS(NotifyBase):
self.logger.warning(
'There are no valid recipient identified to notify.')
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
wrapper to send_notification since we can alert more then one channel
"""
@ -214,10 +223,6 @@ class NotifySNS(NotifyBase):
if not result:
error_count += 1
if len(phone) > 0:
# Prevent thrashing requests
self.throttle()
# Send all our defined topic id's
while len(topics):
@ -256,21 +261,24 @@ class NotifySNS(NotifyBase):
if not result:
error_count += 1
if len(topics) > 0:
# Prevent thrashing requests
self.throttle()
return error_count == 0
def _post(self, payload, to):
"""
Wrapper to request.post() to manage it's response better and make
the notify() function cleaner and easier to maintain.
the send() function cleaner and easier to maintain.
This function returns True if the _post was successful and False
if it wasn't.
"""
# Always call throttle before any remote server i/o is made; for AWS
# time plays a huge factor in the headers being sent with the payload.
# So for AWS (SNS) requests we must throttle before they're generated
# and not directly before the i/o call like other notification
# services do.
self.throttle()
# Convert our payload from a dict() into a urlencoded string
payload = self.urlencode(payload)
@ -282,6 +290,7 @@ class NotifySNS(NotifyBase):
self.notify_url, self.verify_certificate,
))
self.logger.debug('AWS Payload: %s' % str(payload))
try:
r = requests.post(
self.notify_url,
@ -521,6 +530,33 @@ class NotifySNS(NotifyBase):
return response
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
}
return '{schema}://{key_id}/{key_secret}/{region}/{targets}/'\
'?{args}'.format(
schema=self.secure_protocol,
key_id=self.quote(self.aws_access_key_id, safe=''),
key_secret=self.quote(self.aws_secret_access_key, safe=''),
region=self.quote(self.aws_region_name, safe=''),
targets='/'.join(
[self.quote(x) for x in chain(
# Phone # are prefixed with a plus symbol
['+{}'.format(x) for x in self.phone],
# Topics are prefixed with a pound/hashtag symbol
['#{}'.format(x) for x in self.topics],
)]),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""

View File

@ -43,6 +43,7 @@ from time import time
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyImageSize
from ..common import NotifyType
from ..utils import compat_is_basestring
# Token required as part of the API request
@ -141,7 +142,6 @@ class NotifySlack(NotifyBase):
if not self.user:
self.logger.warning(
'No user was specified; using %s.' % SLACK_DEFAULT_USER)
self.user = SLACK_DEFAULT_USER
if compat_is_basestring(channels):
self.channels = [x for x in filter(bool, CHANNEL_LIST_DELIM.split(
@ -175,7 +175,7 @@ class NotifySlack(NotifyBase):
re.IGNORECASE,
)
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Slack Notification
"""
@ -231,7 +231,7 @@ class NotifySlack(NotifyBase):
# prepare JSON Object
payload = {
'channel': _channel,
'username': self.user,
'username': self.user if self.user else SLACK_DEFAULT_USER,
# Use Markdown language
'mrkdwn': True,
'attachments': [{
@ -251,6 +251,9 @@ class NotifySlack(NotifyBase):
url, self.verify_certificate,
))
self.logger.debug('Slack Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,
@ -275,7 +278,7 @@ class NotifySlack(NotifyBase):
channel,
r.status_code))
# self.logger.debug('Response Details: %s' % r.raw.read())
# self.logger.debug('Response Details: %s' % r.content)
# Return; we're done
notify_okay = False
@ -291,12 +294,38 @@ class NotifySlack(NotifyBase):
self.logger.debug('Socket Exception: %s' % str(e))
notify_okay = False
if len(channels):
# Prevent thrashing requests
self.throttle()
return notify_okay
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
}
# Determine if there is a botname present
botname = ''
if self.user:
botname = '{botname}@'.format(
botname=self.quote(self.user, safe=''),
)
return '{schema}://{botname}{token_a}/{token_b}/{token_c}/{targets}/'\
'?{args}'.format(
schema=self.secure_protocol,
botname=botname,
token_a=self.quote(self.token_a, safe=''),
token_b=self.quote(self.token_b, safe=''),
token_c=self.quote(self.token_c, safe=''),
targets='/'.join(
[self.quote(x, safe='') for x in self.channels]),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""

View File

@ -59,10 +59,11 @@ from json import dumps
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyType
from ..common import NotifyImageSize
from ..utils import compat_is_basestring
from ..utils import parse_bool
from ..common import NotifyFormat
from ..utils import parse_bool
from ..utils import parse_list
TELEGRAM_IMAGE_XY = NotifyImageSize.XY_256
@ -81,9 +82,6 @@ IS_CHAT_ID_RE = re.compile(
re.IGNORECASE,
)
# Used to break path apart into list of chat identifiers
CHAT_ID_LIST_DELIM = re.compile(r'[ \t\r\n,#\\/]+')
class NotifyTelegram(NotifyBase):
"""
@ -134,16 +132,8 @@ class NotifyTelegram(NotifyBase):
# Store our Bot Token
self.bot_token = result.group('key')
if compat_is_basestring(chat_ids):
self.chat_ids = [x for x in filter(bool, CHAT_ID_LIST_DELIM.split(
chat_ids,
))]
elif isinstance(chat_ids, (set, tuple, list)):
self.chat_ids = list(chat_ids)
else:
self.chat_ids = list()
# Parse our list
self.chat_ids = parse_list(chat_ids)
if self.user:
# Treat this as a channel too
@ -153,7 +143,7 @@ class NotifyTelegram(NotifyBase):
_id = self.detect_bot_owner()
if _id:
# Store our id
self.chat_ids = [str(_id)]
self.chat_ids.append(str(_id))
if len(self.chat_ids) == 0:
self.logger.warning('No chat_id(s) were specified.')
@ -336,7 +326,7 @@ class NotifyTelegram(NotifyBase):
return 0
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Telegram Notification
"""
@ -424,14 +414,14 @@ class NotifyTelegram(NotifyBase):
# ID
payload['chat_id'] = int(chat_id.group('idno'))
# Always call throttle before any remote server i/o is made;
# Telegram throttles to occur before sending the image so that
# content can arrive together.
self.throttle()
if self.include_image is True:
# Send an image
if self.send_image(
payload['chat_id'], notify_type) is not None:
# We sent a post (whether we were successful or not)
# we still hit the remote server... just throttle
# before our next hit server query
self.throttle()
self.send_image(payload['chat_id'], notify_type)
self.logger.debug('Telegram POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
@ -494,13 +484,28 @@ class NotifyTelegram(NotifyBase):
self.logger.debug('Socket Exception: %s' % str(e))
has_error = True
finally:
if len(chat_ids):
# Prevent thrashing requests
self.throttle()
return not has_error
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
}
# No need to check the user token because the user automatically gets
# appended into the list of chat ids
return '{schema}://{bot_token}/{targets}/?{args}'.format(
schema=self.secure_protocol,
bot_token=self.quote(self.bot_token, safe=''),
targets='/'.join(
[self.quote('@{}'.format(x)) for x in self.chat_ids]),
args=self.urlencode(args))
@staticmethod
def parse_url(url):
"""
@ -512,17 +517,17 @@ class NotifyTelegram(NotifyBase):
# tgram:// messages since the bot_token has a colon in it.
# It invalidates an normal URL.
# This hack searches for this bogus URL and corrects it
# so we can properly load it further down. The other
# alternative is to ask users to actually change the colon
# into a slash (which will work too), but it's more likely
# to cause confusion... So this is the next best thing
# This hack searches for this bogus URL and corrects it so we can
# properly load it further down. The other alternative is to ask users
# to actually change the colon into a slash (which will work too), but
# it's more likely to cause confusion... So this is the next best thing
# we also check for %3A (incase the URL is encoded) as %3A == :
try:
tgram = re.match(
r'(?P<protocol>%s://)(bot)?(?P<prefix>([a-z0-9_-]+)'
r'(:[a-z0-9_-]+)?@)?(?P<btoken_a>[0-9]+):+'
r'(?P<remaining>.*)$' % NotifyTelegram.secure_protocol,
url, re.I)
r'(?P<protocol>{schema}://)(bot)?(?P<prefix>([a-z0-9_-]+)'
r'(:[a-z0-9_-]+)?@)?(?P<btoken_a>[0-9]+)(:|%3A)+'
r'(?P<remaining>.*)$'.format(
schema=NotifyTelegram.secure_protocol), url, re.I)
except (TypeError, AttributeError):
# url is bad; force tgram to be None
@ -534,14 +539,11 @@ class NotifyTelegram(NotifyBase):
if tgram.group('prefix'):
# Try again
results = NotifyBase.parse_url(
'%s%s%s/%s' % (
tgram.group('protocol'),
tgram.group('prefix'),
tgram.group('btoken_a'),
tgram.group('remaining'),
),
)
results = NotifyBase.parse_url('%s%s%s/%s' % (
tgram.group('protocol'),
tgram.group('prefix'),
tgram.group('btoken_a'),
tgram.group('remaining')))
else:
# Try again
@ -562,9 +564,8 @@ class NotifyTelegram(NotifyBase):
bot_token = '%s:%s' % (bot_token_a, bot_token_b)
chat_ids = ','.join(
[x for x in filter(
bool, NotifyBase.split_path(results['fullpath']))][1:])
chat_ids = [x for x in filter(
bool, NotifyBase.split_path(results['fullpath']))][1:]
# Store our bot token
results['bot_token'] = bot_token

View File

@ -25,6 +25,7 @@
from . import tweepy
from ..NotifyBase import NotifyBase
from ...common import NotifyType
class NotifyTwitter(NotifyBase):
@ -50,6 +51,9 @@ class NotifyTwitter(NotifyBase):
# which are limited to 240 characters)
body_maxlen = 4096
# Twitter does have titles when creating a message
title_maxlen = 0
def __init__(self, ckey, csecret, akey, asecret, **kwargs):
"""
Initialize Twitter Object
@ -90,7 +94,7 @@ class NotifyTwitter(NotifyBase):
return
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Twitter Notification
"""
@ -109,13 +113,16 @@ class NotifyTwitter(NotifyBase):
)
return False
text = '%s\r\n%s' % (title, body)
# Always call throttle before any remote server i/o is made to avoid
# thrashing the remote server and risk being blocked.
self.throttle()
try:
# Get our API
api = tweepy.API(self.auth)
# Send our Direct Message
api.send_direct_message(self.user, text=text)
api.send_direct_message(self.user, text=body)
self.logger.info('Sent Twitter DM notification.')
except Exception as e:

View File

@ -26,11 +26,11 @@
from __future__ import absolute_import
from __future__ import print_function
import re
from time import sleep
from .NotifyBase import NotifyBase
from ..common import NotifyImageSize
from ..common import NotifyType
# Default our global support flag
NOTIFY_WINDOWS_SUPPORT_ENABLED = False
@ -64,9 +64,17 @@ class NotifyWindows(NotifyBase):
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_windows'
# Disable throttle rate for Windows requests since they are normally
# local anyway
request_rate_per_sec = 0
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_128
# Limit results to just the first 2 line otherwise there is just to much
# content to display
body_max_line_count = 2
# This entry is a bit hacky, but it allows us to unit-test this library
# in an environment that simply doesn't have the windows packages
# available to us. It also allows us to handle situations where the
@ -100,7 +108,7 @@ class NotifyWindows(NotifyBase):
return None
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Windows Notification
"""
@ -110,11 +118,8 @@ class NotifyWindows(NotifyBase):
"Windows Notifications are not supported by this system.")
return False
# Limit results to just the first 2 line otherwise
# there is just to much content to display
body = re.split('[\r\n]+', body)
body[0] = body[0].strip('#').strip()
body = '\r\n'.join(body[0:2])
# Always call throttle before any remote server i/o is made
self.throttle()
try:
# Register destruction callback
@ -168,13 +173,20 @@ class NotifyWindows(NotifyBase):
self.logger.info('Sent Windows notification.')
except Exception as e:
except Exception:
self.logger.warning('Failed to send Windows notification.')
self.logger.exception('Windows Exception')
return False
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
return '{schema}://'.format(schema=self.protocol)
@staticmethod
def parse_url(url):
"""

View File

@ -23,7 +23,6 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import re
import requests
from json import dumps
@ -44,15 +43,28 @@ class NotifyXBMC(NotifyBase):
# The services URL
service_url = 'http://kodi.tv/'
xbmc_protocol = 'xbmc'
xbmc_secure_protocol = 'xbmcs'
kodi_protocol = 'kodi'
kodi_secure_protocol = 'kodis'
# The default protocols
protocol = ('xbmc', 'kodi')
protocol = (xbmc_protocol, kodi_protocol)
# The default secure protocols
secure_protocol = ('xbmc', 'kodis')
secure_protocol = (xbmc_secure_protocol, kodi_secure_protocol)
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_kodi'
# Disable throttle rate for XBMC/KODI requests since they are normally
# local anyway
request_rate_per_sec = 0
# Limit results to just the first 2 line otherwise there is just to much
# content to display
body_max_line_count = 2
# XBMC uses the http protocol with JSON requests
xbmc_default_port = 8080
@ -149,17 +161,11 @@ class NotifyXBMC(NotifyBase):
return (self.headers, dumps(payload))
def notify(self, title, body, notify_type, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform XBMC/KODI Notification
"""
# Limit results to just the first 2 line otherwise
# there is just to much content to display
body = re.split('[\r\n]+', body)
body[0] = body[0].strip('#').strip()
body = '\r\n'.join(body[0:2])
if self.protocol == self.xbmc_remote_protocol:
# XBMC v2.0
(headers, payload) = self._payload_20(
@ -184,6 +190,10 @@ class NotifyXBMC(NotifyBase):
url, self.verify_certificate,
))
self.logger.debug('XBMC/KODI Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,
@ -224,6 +234,45 @@ class NotifyXBMC(NotifyBase):
return True
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
}
# Determine Authentication
auth = ''
if self.user and self.password:
auth = '{user}:{password}@'.format(
user=self.quote(self.user, safe=''),
password=self.quote(self.password, safe=''),
)
elif self.user:
auth = '{user}@'.format(
user=self.quote(self.user, safe=''),
)
default_schema = self.xbmc_protocol if (
self.protocol <= self.xbmc_remote_protocol) else self.kodi_protocol
default_port = 443 if self.secure else self.xbmc_default_port
if self.secure:
# Append 's' to schema
default_schema + 's'
return '{schema}://{auth}{hostname}{port}/?{args}'.format(
schema=default_schema,
auth=auth,
hostname=self.host,
port='' if not self.port or self.port == default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""

View File

@ -29,6 +29,7 @@ import requests
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyImageSize
from ..common import NotifyType
from ..utils import compat_is_basestring
@ -52,9 +53,17 @@ class NotifyXML(NotifyBase):
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_128
def __init__(self, **kwargs):
# Disable throttle rate for JSON requests since they are normally
# local anyway
request_rate_per_sec = 0
def __init__(self, headers=None, **kwargs):
"""
Initialize XML Object
headers can be a dictionary of key/value pairs that you want to
additionally include as part of the server headers to post with
"""
super(NotifyXML, self).__init__(**kwargs)
@ -83,9 +92,51 @@ class NotifyXML(NotifyBase):
if not compat_is_basestring(self.fullpath):
self.fullpath = '/'
self.headers = {}
if headers:
# Store our extra headers
self.headers.update(headers)
return
def notify(self, title, body, notify_type, **kwargs):
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
}
# Append our headers into our args
args.update({'+{}'.format(k): v for k, v in self.headers.items()})
# Determine Authentication
auth = ''
if self.user and self.password:
auth = '{user}:{password}@'.format(
user=self.quote(self.user, safe=''),
password=self.quote(self.password, safe=''),
)
elif self.user:
auth = '{user}@'.format(
user=self.quote(self.user, safe=''),
)
default_port = 443 if self.secure else 80
return '{schema}://{auth}{hostname}{port}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
args=self.urlencode(args),
)
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform XML Notification
"""
@ -96,8 +147,8 @@ class NotifyXML(NotifyBase):
'Content-Type': 'application/xml'
}
if self.headers:
headers.update(self.headers)
# Apply any/all header over-rides defined
headers.update(self.headers)
re_map = {
'{MESSAGE_TYPE}': NotifyBase.quote(notify_type),
@ -126,6 +177,10 @@ class NotifyXML(NotifyBase):
url, self.verify_certificate,
))
self.logger.debug('XML Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,
@ -163,3 +218,23 @@ class NotifyXML(NotifyBase):
return False
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Add our headers that the user can potentially over-ride if they wish
# to to our returned result set
results['headers'] = results['qsd-']
results['headers'].update(results['qsd+'])
return results

View File

@ -32,14 +32,12 @@ try:
from urllib import unquote
from urllib import quote
from urlparse import urlparse
from urlparse import parse_qsl
except ImportError:
# Python 3.x
from urllib.parse import unquote
from urllib.parse import quote
from urllib.parse import urlparse
from urllib.parse import parse_qsl
import logging
logger = logging.getLogger(__name__)
@ -91,6 +89,12 @@ TIDY_NUX_TRIM_RE = re.compile(
),
)
# The handling of custom arguments passed in the URL; we treat any
# argument (which would otherwise appear in the qsd area of our parse_url()
# function differently if they start with a + or - value
NOTIFY_CUSTOM_ADD_TOKENS = re.compile(r'^( |\+)(?P<key>.*)\s*')
NOTIFY_CUSTOM_DEL_TOKENS = re.compile(r'^-(?P<key>.*)\s*')
# Used for attempting to acquire the schema if the URL can't be parsed.
GET_SCHEMA_RE = re.compile(r'\s*(?P<schema>[a-z0-9]{2,9})://.*$', re.I)
@ -143,6 +147,81 @@ def tidy_path(path):
return path
def parse_qsd(qs):
"""
Query String Dictionary Builder
A custom implimentation of the parse_qsl() function already provided
by Python. This function is slightly more light weight and gives us
more control over parsing out arguments such as the plus/+ symbol
at the head of a key/value pair.
qs should be a query string part made up as part of the URL such as
a=1&c=2&d=
a=1 gets interpreted as { 'a': '1' }
a= gets interpreted as { 'a': '' }
a gets interpreted as { 'a': '' }
This function returns a result object that fits with the apprise
expected parameters (populating the 'qsd' portion of the dictionary
"""
# Our return result set:
result = {
# The arguments passed in (the parsed query). This is in a dictionary
# of {'key': 'val', etc }. Keys are all made lowercase before storing
# to simplify access to them.
'qsd': {},
# Detected Entries that start with + or - are additionally stored in
# these values (un-touched). The +/- however are stripped from their
# name before they are stored here.
'qsd+': {},
'qsd-': {},
}
pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')]
for name_value in pairs:
nv = name_value.split('=', 1)
# Handle case of a control-name with no equal sign
if len(nv) != 2:
nv.append('')
# Apprise keys can start with a + symbol; so we need to skip over
# the very first entry
key = '{}{}'.format(
'' if len(nv[0]) == 0 else nv[0][0],
'' if len(nv[0]) <= 1 else nv[0][1:].replace('+', ' '),
)
key = unquote(key)
key = '' if not key else key
val = nv[1].replace('+', ' ')
val = unquote(val)
val = '' if not val else val.strip()
# Always Query String Dictionary (qsd) for every entry we have
# content is always made lowercase for easy indexing
result['qsd'][key.lower().strip()] = val
# Check for tokens that start with a addition/plus symbol (+)
k = NOTIFY_CUSTOM_ADD_TOKENS.match(key)
if k is not None:
# Store content 'as-is'
result['qsd+'][k.group('key')] = val
# Check for tokens that start with a subtraction/hyphen symbol (-)
k = NOTIFY_CUSTOM_DEL_TOKENS.match(key)
if k is not None:
# Store content 'as-is'
result['qsd-'][k.group('key')] = val
return result
def parse_url(url, default_schema='http', verify_host=True):
"""A function that greatly simplifies the parsing of a url
specified by the end user.
@ -190,10 +269,17 @@ def parse_url(url, default_schema='http', verify_host=True):
'schema': None,
# The schema
'url': None,
# The arguments passed in (the parsed query)
# This is in a dictionary of {'key': 'val', etc }
# The arguments passed in (the parsed query). This is in a dictionary
# of {'key': 'val', etc }. Keys are all made lowercase before storing
# to simplify access to them.
# qsd = Query String Dictionary
'qsd': {}
'qsd': {},
# Detected Entries that start with + or - are additionally stored in
# these values (un-touched). The +/- however are stripped from their
# name before they are stored here.
'qsd+': {},
'qsd-': {},
}
qsdata = ''
@ -220,6 +306,11 @@ def parse_url(url, default_schema='http', verify_host=True):
# No qsdata
pass
# Parse Query Arugments ?val=key&key=val
# while ensuring that all keys are lowercase
if qsdata:
result.update(parse_qsd(qsdata))
# Now do a proper extraction of data
parsed = urlparse('http://%s' % host)
@ -231,6 +322,7 @@ def parse_url(url, default_schema='http', verify_host=True):
return None
result['fullpath'] = quote(unquote(tidy_path(parsed[2].strip())))
try:
# Handle trailing slashes removed by tidy_path
if result['fullpath'][-1] not in ('/', '\\') and \
@ -242,16 +334,6 @@ def parse_url(url, default_schema='http', verify_host=True):
# and therefore, no trailing slash
pass
# Parse Query Arugments ?val=key&key=val
# while ensureing that all keys are lowercase
if qsdata:
result['qsd'] = dict([(k.lower().strip(), v.strip())
for k, v in parse_qsl(
qsdata,
keep_blank_values=True,
strict_parsing=False,
)])
if not result['fullpath']:
# Default
result['fullpath'] = None
@ -397,6 +479,10 @@ def parse_list(*args):
elif isinstance(arg, (set, list, tuple)):
result += parse_list(*arg)
elif arg is None:
# Ignore
continue
else:
# Convert whatever it is to a string and work with it
result += parse_list(str(arg))

View File

@ -11,6 +11,12 @@ exclude = .eggs,.tox,gntp,tweepy,pushjet
ignore = E722,W503,W504
statistics = true
[flake8]
# We exclude packages we don't maintain
exclude = .eggs,.tox,gntp,tweepy,pushjet
ignore = E722,W503,W504
statistics = true
[aliases]
test=pytest

View File

@ -71,15 +71,34 @@ def test_apprise():
a = Apprise(servers=servers)
# 3 servers loaded
# 2 servers loaded
assert(len(a) == 2)
# We can retrieve our URLs this way:
assert(len(a.urls()) == 2)
# We can add another server
assert(
a.add('mmosts://mattermost.server.local/'
'3ccdd113474722377935511fc85d3dd4') is True)
assert(len(a) == 3)
# We can pop an object off of our stack by it's indexed value:
obj = a.pop(0)
assert(isinstance(obj, NotifyBase) is True)
assert(len(a) == 2)
# We can retrieve elements from our list too by reference:
assert(compat_is_basestring(a[0].url()) is True)
# We can iterate over our list too:
count = 0
for o in a:
assert(compat_is_basestring(o.url()) is True)
count += 1
# verify that we did indeed iterate over each element
assert(len(a) == count)
# We can empty our set
a.clear()
assert(len(a) == 0)

View File

@ -26,6 +26,7 @@
from apprise import plugins
from apprise import NotifyType
from apprise import Apprise
from apprise.utils import compat_is_basestring
from apprise.plugins import NotifyEmailBase
import smtplib
@ -49,7 +50,7 @@ TEST_URLS = (
# No Username
('mailtos://:pass@nuxref.com:567', {
# Can't prepare a To address using this expression
'exception': TypeError,
'instance': TypeError,
}),
# Pre-Configured Email Services
@ -115,27 +116,27 @@ TEST_URLS = (
}),
# Invalid From Address
('mailtos://user:pass@nuxref.com?from=@', {
'exception': TypeError,
'instance': TypeError,
}),
# Invalid From Address
('mailtos://nuxref.com?user=&pass=.', {
'exception': TypeError,
'instance': TypeError,
}),
# Invalid To Address
('mailtos://user:pass@nuxref.com?to=@', {
'exception': TypeError,
'instance': TypeError,
}),
# Valid URL, but can't structure a proper email
('mailtos://nuxref.com?user=%20!&pass=.', {
'exception': TypeError,
'instance': TypeError,
}),
# Invalid From (and To) Address
('mailtos://nuxref.com?to=test', {
'exception': TypeError,
'instance': TypeError,
}),
# Invalid Secure Mode
('mailtos://user:pass@example.com?mode=notamode', {
'exception': TypeError,
'instance': TypeError,
}),
# STARTTLS flag checking
('mailtos://user:pass@gmail.com?mode=starttls', {
@ -165,6 +166,8 @@ def test_email_plugin(mock_smtp, mock_smtpssl):
API: NotifyEmail Plugin()
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# iterate over our dictionary and test it out
for (url, meta) in TEST_URLS:
@ -172,9 +175,6 @@ def test_email_plugin(mock_smtp, mock_smtpssl):
# Our expected instance
instance = meta.get('instance', None)
# Our expected exception
exception = meta.get('exception', None)
# Our expected server objects
self = meta.get('self', None)
@ -217,19 +217,37 @@ def test_email_plugin(mock_smtp, mock_smtpssl):
try:
obj = Apprise.instantiate(url, suppress_exceptions=False)
assert(exception is None)
if obj is None:
# We're done
# We're done (assuming this is what we were expecting)
assert instance is None
continue
if instance is None:
# Expected None but didn't get it
print('%s instantiated %s' % (url, str(obj)))
print('%s instantiated %s (but expected None)' % (
url, str(obj)))
assert(False)
assert(isinstance(obj, instance))
if isinstance(obj, plugins.NotifyBase.NotifyBase):
# We loaded okay; now lets make sure we can reverse this url
assert(compat_is_basestring(obj.url()) is True)
# Instantiate the exact same object again using the URL from
# the one that was already created properly
obj_cmp = Apprise.instantiate(obj.url())
# Our object should be the same instance as what we had
# originally expected above.
if not isinstance(obj_cmp, plugins.NotifyBase.NotifyBase):
# Assert messages are hard to trace back with the way
# these tests work. Just printing before throwing our
# assertion failure makes things easier to debug later on
print('TEST FAIL: {} regenerated as {}'.format(
url, obj.url()))
assert(False)
if self:
# Iterate over our expected entries inside of our object
for key, val in self.items():
@ -256,18 +274,19 @@ def test_email_plugin(mock_smtp, mock_smtpssl):
# Don't mess with these entries
raise
except Exception as e:
except Exception:
# We can't handle this exception type
print('%s / %s' % (url, str(e)))
assert False
raise
except AssertionError:
# Don't mess with these entries
print('%s AssertionError' % url)
raise
except Exception as e:
# Check that we were expecting this exception to happen
assert isinstance(e, response)
if not isinstance(e, response):
raise
except AssertionError:
# Don't mess with these entries
@ -276,9 +295,11 @@ def test_email_plugin(mock_smtp, mock_smtpssl):
except Exception as e:
# Handle our exception
print('%s / %s' % (url, str(e)))
assert(exception is not None)
assert(isinstance(e, exception))
if(instance is None):
raise
if not isinstance(e, instance):
raise
@mock.patch('smtplib.SMTP')
@ -323,34 +344,23 @@ def test_smtplib_init_fail(mock_smtplib):
API: Test exception handling when calling smtplib.SMTP()
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
obj = Apprise.instantiate(
'mailto://user:pass@gmail.com', suppress_exceptions=False)
assert(isinstance(obj, plugins.NotifyEmail))
# Support Exception handling of smtplib.SMTP
mock_smtplib.side_effect = TypeError('Test')
mock_smtplib.side_effect = RuntimeError('Test')
try:
obj.notify(
title='test', body='body',
notify_type=NotifyType.INFO)
# We should have thrown an exception
assert False
except TypeError:
# Exception thrown as expected
assert True
except Exception:
# Un-Expected
assert False
assert obj.notify(
body='body', title='test', notify_type=NotifyType.INFO) is False
# A handled and expected exception
mock_smtplib.side_effect = smtplib.SMTPException('Test')
assert obj.notify(title='test', body='body',
notify_type=NotifyType.INFO) is False
assert obj.notify(
body='body', title='test', notify_type=NotifyType.INFO) is False
@mock.patch('smtplib.SMTP')
@ -359,6 +369,8 @@ def test_smtplib_send_okay(mock_smtplib):
API: Test a successfully sent email
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# Defaults to HTML
obj = Apprise.instantiate(
@ -372,7 +384,7 @@ def test_smtplib_send_okay(mock_smtplib):
mock_smtplib.quit.return_value = True
assert(obj.notify(
title='test', body='body', notify_type=NotifyType.INFO) is True)
body='body', title='test', notify_type=NotifyType.INFO) is True)
# Set Text
obj = Apprise.instantiate(
@ -380,4 +392,4 @@ def test_smtplib_send_okay(mock_smtplib):
assert(isinstance(obj, plugins.NotifyEmail))
assert(obj.notify(
title='test', body='body', notify_type=NotifyType.INFO) is True)
body='body', title='test', notify_type=NotifyType.INFO) is True)

View File

@ -28,6 +28,7 @@ import mock
import sys
import types
import apprise
from apprise.utils import compat_is_basestring
try:
# Python v3.4+
@ -223,6 +224,9 @@ def test_dbus_plugin(mock_mainloop, mock_byte, mock_bytearray,
assert(isinstance(obj, apprise.plugins.NotifyDBus) is True)
obj.duration = 0
# Test url() call
assert(compat_is_basestring(obj.url()) is True)
# Our notification succeeds even though the gi library was not loaded
assert(obj.notify(title='title', body='body',
notify_type=apprise.NotifyType.INFO) is True)

View File

@ -28,6 +28,7 @@ import sys
import types
import apprise
from apprise.utils import compat_is_basestring
try:
# Python v3.4+
@ -113,6 +114,9 @@ def test_gnome_plugin():
# Check that it found our mocked environments
assert(obj._enabled is True)
# Test url() call
assert(compat_is_basestring(obj.url()) is True)
# test notifications
assert(obj.notify(title='title', body='body',
notify_type=apprise.NotifyType.INFO) is True)

View File

@ -26,8 +26,9 @@
from apprise import plugins
from apprise import NotifyType
from apprise import Apprise
from apprise.utils import compat_is_basestring
import mock
import re
TEST_URLS = (
@ -193,9 +194,8 @@ def test_growl_plugin(mock_gntp):
# This is the response we expect
assert True
except Exception as e:
except Exception:
# We can't handle this exception type
print('%s / %s' % (url, str(e)))
assert False
# We're done this part of the test
@ -216,10 +216,27 @@ def test_growl_plugin(mock_gntp):
if instance is None:
# Expected None but didn't get it
print('%s instantiated %s' % (url, str(obj)))
assert(False)
assert(isinstance(obj, instance))
assert(isinstance(obj, instance) is True)
if isinstance(obj, plugins.NotifyBase.NotifyBase):
# We loaded okay; now lets make sure we can reverse this url
assert(compat_is_basestring(obj.url()) is True)
# Instantiate the exact same object again using the URL from
# the one that was already created properly
obj_cmp = Apprise.instantiate(obj.url())
# Our object should be the same instance as what we had
# originally expected above.
if not isinstance(obj_cmp, plugins.NotifyBase.NotifyBase):
# Assert messages are hard to trace back with the way
# these tests work. Just printing before throwing our
# assertion failure makes things easier to debug later on
print('TEST FAIL: {} regenerated as {}'.format(
url, obj.url()))
assert(False)
if self:
# Iterate over our expected entries inside of our object

View File

@ -23,6 +23,9 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from datetime import datetime
from datetime import timedelta
from apprise.plugins.NotifyBase import NotifyBase
from apprise import NotifyType
from apprise import NotifyImageSize
@ -45,6 +48,15 @@ def test_notify_base():
except TypeError:
assert(True)
# invalid types throw exceptions
try:
nb = NotifyBase(**{'overflow': 'invalid'})
# We should never reach here as an exception should be thrown
assert(False)
except TypeError:
assert(True)
# Bad port information
nb = NotifyBase(port='invalid')
assert nb.port is None
@ -52,9 +64,29 @@ def test_notify_base():
nb = NotifyBase(port=10)
assert nb.port == 10
try:
nb.url()
assert False
except NotImplementedError:
# Each sub-module is that inherits this as a parent is required to
# over-ride this function. So direct calls to this throws a not
# implemented error intentionally
assert True
try:
nb.send('test message')
assert False
except NotImplementedError:
# Each sub-module is that inherits this as a parent is required to
# over-ride this function. So direct calls to this throws a not
# implemented error intentionally
assert True
# Throttle overrides..
nb = NotifyBase()
nb.throttle_attempt = 0.0
nb.request_rate_per_sec = 0.0
start_time = default_timer()
nb.throttle()
elapsed = default_timer() - start_time
@ -63,13 +95,57 @@ def test_notify_base():
# then other
assert elapsed < 0.5
# Concurrent calls should achieve the same response
start_time = default_timer()
nb.throttle(1.0)
nb.throttle()
elapsed = default_timer() - start_time
assert elapsed < 0.5
nb = NotifyBase()
nb.request_rate_per_sec = 1.0
# Set our time to now
start_time = default_timer()
nb.throttle()
elapsed = default_timer() - start_time
# A first call to throttle (Without telling it a time previously ran) does
# not block for any length of time; it just merely sets us up for
# concurrent calls to block
assert elapsed < 0.5
# Concurrent calls could take up to the rate_per_sec though...
start_time = default_timer()
nb.throttle(last_io=datetime.now())
elapsed = default_timer() - start_time
assert elapsed > 0.5 and elapsed < 1.5
nb = NotifyBase()
nb.request_rate_per_sec = 1.0
# Set our time to now
start_time = default_timer()
nb.throttle(last_io=datetime.now())
elapsed = default_timer() - start_time
# because we told it that we had already done a previous action (now)
# the throttle holds out until the right time has passed
assert elapsed > 0.5 and elapsed < 1.5
# Concurrent calls could take up to the rate_per_sec though...
start_time = default_timer()
nb.throttle(last_io=datetime.now())
elapsed = default_timer() - start_time
assert elapsed > 0.5 and elapsed < 1.5
nb = NotifyBase()
start_time = default_timer()
nb.request_rate_per_sec = 1.0
# Force a time in the past
nb.throttle(last_io=(datetime.now() - timedelta(seconds=20)))
elapsed = default_timer() - start_time
# Should be a very fast response time since we set it to zero but we'll
# check for less then 500 to be fair as some testing systems may be slower
# then other
assert elapsed < 1.5
assert elapsed < 0.5
# our NotifyBase wasn't initialized with an ImageSize so this will fail
assert nb.image_url(notify_type=NotifyType.INFO) is None
@ -166,11 +242,30 @@ def test_notify_base_urls():
assert 'password' in results
assert results['password'] == "newpassword"
# pass headers
results = NotifyBase.parse_url(
'https://localhost:8080?-HeaderKey=HeaderValue')
assert 'headerkey' in results['headers']
assert results['headers']['headerkey'] == 'HeaderValue'
# Options
results = NotifyBase.parse_url('https://localhost?format=invalid')
assert 'format' not in results
results = NotifyBase.parse_url('https://localhost?format=text')
assert 'format' in results
assert results['format'] == 'text'
results = NotifyBase.parse_url('https://localhost?format=markdown')
assert 'format' in results
assert results['format'] == 'markdown'
results = NotifyBase.parse_url('https://localhost?format=html')
assert 'format' in results
assert results['format'] == 'html'
results = NotifyBase.parse_url('https://localhost?overflow=invalid')
assert 'overflow' not in results
results = NotifyBase.parse_url('https://localhost?overflow=upstream')
assert 'overflow' in results
assert results['overflow'] == 'upstream'
results = NotifyBase.parse_url('https://localhost?overflow=split')
assert 'overflow' in results
assert results['overflow'] == 'split'
results = NotifyBase.parse_url('https://localhost?overflow=truncate')
assert 'overflow' in results
assert results['overflow'] == 'truncate'
# User Handling

View File

@ -26,6 +26,8 @@
from apprise import plugins
from apprise import NotifyType
from apprise import Apprise
from apprise.utils import compat_is_basestring
import mock
TEST_URLS = (
@ -104,13 +106,37 @@ def test_plugin(mock_refresh, mock_send):
try:
obj = Apprise.instantiate(url, suppress_exceptions=False)
if instance is None:
# Check that we got what we came for
assert obj is instance
if obj is None:
# We're done (assuming this is what we were expecting)
assert instance is None
continue
if instance is None:
# Expected None but didn't get it
print('%s instantiated %s (but expected None)' % (
url, str(obj)))
assert(False)
assert(isinstance(obj, instance))
if isinstance(obj, plugins.NotifyBase.NotifyBase):
# We loaded okay; now lets make sure we can reverse this url
assert(compat_is_basestring(obj.url()) is True)
# Instantiate the exact same object again using the URL from
# the one that was already created properly
obj_cmp = Apprise.instantiate(obj.url())
# Our object should be the same instance as what we had
# originally expected above.
if not isinstance(obj_cmp, plugins.NotifyBase.NotifyBase):
# Assert messages are hard to trace back with the way
# these tests work. Just printing before throwing our
# assertion failure makes things easier to debug later on
print('TEST FAIL: {} regenerated as {}'.format(
url, obj.url()))
assert(False)
if self:
# Iterate over our expected entries inside of our object
for key, val in self.items():
@ -142,23 +168,29 @@ def test_plugin(mock_refresh, mock_send):
# Don't mess with these entries
raise
except Exception as e:
except Exception:
# We can't handle this exception type
assert False
raise
except AssertionError:
# Don't mess with these entries
print('%s AssertionError' % url)
raise
except Exception as e:
# Check that we were expecting this exception to happen
assert isinstance(e, response)
if not isinstance(e, response):
raise
except AssertionError:
# Don't mess with these entries
print('%s AssertionError' % url)
raise
except Exception as e:
# Handle our exception
assert(instance is not None)
assert(isinstance(e, instance))
if(instance is None):
raise
if not isinstance(e, instance):
raise

File diff suppressed because it is too large Load Diff

View File

@ -303,6 +303,8 @@ def test_aws_topic_handling(mock_post):
API: NotifySNS Plugin() AWS Topic Handling
"""
# Disable Throttling to speed testing
plugins.NotifySNS.request_rate_per_sec = 0
arn_response = \
"""
@ -336,9 +338,6 @@ def test_aws_topic_handling(mock_post):
# Assign ourselves a new function
mock_post.side_effect = post
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
# Create our object
a = Apprise()

View File

@ -35,6 +35,26 @@ except ImportError:
from apprise import utils
def test_parse_qsd():
"utils: parse_qsd() testing """
result = utils.parse_qsd('a=1&b=&c&d=abcd')
assert(isinstance(result, dict) is True)
assert(len(result) == 3)
assert 'qsd' in result
assert 'qsd+' in result
assert 'qsd-' in result
assert(len(result['qsd']) == 4)
assert 'a' in result['qsd']
assert 'b' in result['qsd']
assert 'c' in result['qsd']
assert 'd' in result['qsd']
assert(len(result['qsd-']) == 0)
assert(len(result['qsd+']) == 0)
def test_parse_url():
"utils: parse_url() testing """
@ -49,6 +69,8 @@ def test_parse_url():
assert(result['query'] is None)
assert(result['url'] == 'http://hostname')
assert(result['qsd'] == {})
assert(result['qsd-'] == {})
assert(result['qsd+'] == {})
result = utils.parse_url('http://hostname/')
assert(result['schema'] == 'http')
@ -61,6 +83,8 @@ def test_parse_url():
assert(result['query'] is None)
assert(result['url'] == 'http://hostname/')
assert(result['qsd'] == {})
assert(result['qsd-'] == {})
assert(result['qsd+'] == {})
result = utils.parse_url('hostname')
assert(result['schema'] == 'http')
@ -73,6 +97,61 @@ def test_parse_url():
assert(result['query'] is None)
assert(result['url'] == 'http://hostname')
assert(result['qsd'] == {})
assert(result['qsd-'] == {})
assert(result['qsd+'] == {})
result = utils.parse_url('http://hostname/?-KeY=Value')
assert(result['schema'] == 'http')
assert(result['host'] == 'hostname')
assert(result['port'] is None)
assert(result['user'] is None)
assert(result['password'] is None)
assert(result['fullpath'] == '/')
assert(result['path'] == '/')
assert(result['query'] is None)
assert(result['url'] == 'http://hostname/')
assert('-key' in result['qsd'])
assert(unquote(result['qsd']['-key']) == 'Value')
assert('KeY' in result['qsd-'])
assert(unquote(result['qsd-']['KeY']) == 'Value')
assert(result['qsd+'] == {})
result = utils.parse_url('http://hostname/?+KeY=Value')
assert(result['schema'] == 'http')
assert(result['host'] == 'hostname')
assert(result['port'] is None)
assert(result['user'] is None)
assert(result['password'] is None)
assert(result['fullpath'] == '/')
assert(result['path'] == '/')
assert(result['query'] is None)
assert(result['url'] == 'http://hostname/')
assert('+key' in result['qsd'])
assert('KeY' in result['qsd+'])
assert(result['qsd+']['KeY'] == 'Value')
assert(result['qsd-'] == {})
result = utils.parse_url(
'http://hostname/?+KeY=ValueA&-kEy=ValueB&KEY=Value%20+C')
assert(result['schema'] == 'http')
assert(result['host'] == 'hostname')
assert(result['port'] is None)
assert(result['user'] is None)
assert(result['password'] is None)
assert(result['fullpath'] == '/')
assert(result['path'] == '/')
assert(result['query'] is None)
assert(result['url'] == 'http://hostname/')
assert('+key' in result['qsd'])
assert('-key' in result['qsd'])
assert('key' in result['qsd'])
assert('KeY' in result['qsd+'])
assert(result['qsd+']['KeY'] == 'ValueA')
assert('kEy' in result['qsd-'])
assert(result['qsd-']['kEy'] == 'ValueB')
assert(result['qsd']['key'] == 'Value C')
assert(result['qsd']['+key'] == result['qsd+']['KeY'])
assert(result['qsd']['-key'] == result['qsd-']['kEy'])
result = utils.parse_url('http://hostname////')
assert(result['schema'] == 'http')
@ -85,6 +164,8 @@ def test_parse_url():
assert(result['query'] is None)
assert(result['url'] == 'http://hostname/')
assert(result['qsd'] == {})
assert(result['qsd-'] == {})
assert(result['qsd+'] == {})
result = utils.parse_url('http://hostname:40////')
assert(result['schema'] == 'http')
@ -97,6 +178,8 @@ def test_parse_url():
assert(result['query'] is None)
assert(result['url'] == 'http://hostname:40/')
assert(result['qsd'] == {})
assert(result['qsd-'] == {})
assert(result['qsd+'] == {})
result = utils.parse_url('HTTP://HoStNaMe:40/test.php')
assert(result['schema'] == 'http')
@ -109,6 +192,8 @@ def test_parse_url():
assert(result['query'] == 'test.php')
assert(result['url'] == 'http://HoStNaMe:40/test.php')
assert(result['qsd'] == {})
assert(result['qsd-'] == {})
assert(result['qsd+'] == {})
result = utils.parse_url('HTTPS://user@hostname/test.py')
assert(result['schema'] == 'https')
@ -121,6 +206,8 @@ def test_parse_url():
assert(result['query'] == 'test.py')
assert(result['url'] == 'https://user@hostname/test.py')
assert(result['qsd'] == {})
assert(result['qsd-'] == {})
assert(result['qsd+'] == {})
result = utils.parse_url(' HTTPS://///user@@@hostname///test.py ')
assert(result['schema'] == 'https')
@ -133,6 +220,8 @@ def test_parse_url():
assert(result['query'] == 'test.py')
assert(result['url'] == 'https://user@hostname/test.py')
assert(result['qsd'] == {})
assert(result['qsd-'] == {})
assert(result['qsd+'] == {})
result = utils.parse_url(
'HTTPS://user:password@otherHost/full///path/name/',
@ -147,6 +236,8 @@ def test_parse_url():
assert(result['query'] is None)
assert(result['url'] == 'https://user:password@otherHost/full/path/name/')
assert(result['qsd'] == {})
assert(result['qsd-'] == {})
assert(result['qsd+'] == {})
# Handle garbage
assert(utils.parse_url(None) is None)
@ -173,6 +264,8 @@ def test_parse_url():
assert(unquote(result['qsd']['from']) == 'test@test.com')
assert('format' in result['qsd'])
assert(unquote(result['qsd']['format']) == 'text')
assert(result['qsd-'] == {})
assert(result['qsd+'] == {})
# Test Passwords with question marks ?; not supported
result = utils.parse_url(
@ -194,6 +287,8 @@ def test_parse_url():
assert(result['query'] is None)
assert(result['url'] == 'http://nuxref.com')
assert(result['qsd'] == {})
assert(result['qsd-'] == {})
assert(result['qsd+'] == {})
# just host and path
result = utils.parse_url(
@ -209,6 +304,8 @@ def test_parse_url():
assert(result['query'] == 'host')
assert(result['url'] == 'http://invalid/host')
assert(result['qsd'] == {})
assert(result['qsd-'] == {})
assert(result['qsd+'] == {})
# just all out invalid
assert(utils.parse_url('?') is None)
@ -227,6 +324,8 @@ def test_parse_url():
assert(result['query'] is None)
assert(result['url'] == 'http://nuxref.com')
assert(result['qsd'] == {})
assert(result['qsd-'] == {})
assert(result['qsd+'] == {})
def test_parse_bool():

View File

@ -29,6 +29,7 @@ import types
# Rebuild our Apprise environment
import apprise
from apprise.utils import compat_is_basestring
try:
# Python v3.4+
@ -107,6 +108,9 @@ def test_windows_plugin():
obj = apprise.Apprise.instantiate('windows://', suppress_exceptions=False)
obj.duration = 0
# Test URL functionality
assert(compat_is_basestring(obj.url()) is True)
# Check that it found our mocked environments
assert(obj._enabled is True)

32
tox.ini
View File

@ -10,36 +10,45 @@ setenv =
deps=
-r{toxinidir}/requirements.txt
-r{toxinidir}/dev-requirements.txt
commands = python -m pytest {posargs}
commands =
coverage run --parallel -m pytest {posargs}
flake8 . --count --show-source --statistics
[testenv:py27]
deps=
dbus-python
-r{toxinidir}/requirements.txt
-r{toxinidir}/dev-requirements.txt
commands = coverage run --parallel -m pytest {posargs}
commands =
coverage run --parallel -m pytest {posargs}
flake8 . --count --show-source --statistics
[testenv:py34]
deps=
dbus-python
-r{toxinidir}/requirements.txt
-r{toxinidir}/dev-requirements.txt
commands = coverage run --parallel -m pytest {posargs}
commands =
coverage run --parallel -m pytest {posargs}
flake8 . --count --show-source --statistics
[testenv:py35]
deps=
dbus-python
-r{toxinidir}/requirements.txt
-r{toxinidir}/dev-requirements.txt
commands = coverage run --parallel -m pytest {posargs}
commands =
coverage run --parallel -m pytest {posargs}
flake8 . --count --show-source --statistics
[testenv:py36]
deps=
dbus-python
-r{toxinidir}/requirements.txt
-r{toxinidir}/dev-requirements.txt
commands = coverage run --parallel -m pytest {posargs}
commands =
coverage run --parallel -m pytest {posargs}
flake8 . --count --show-source --statistics
[testenv:py37]
deps=
@ -47,21 +56,24 @@ deps=
-r{toxinidir}/requirements.txt
-r{toxinidir}/dev-requirements.txt
commands =
flake8 . --count --select=E901,E999,F821,F822,F823 --show-source --statistics
coverage run --parallel -m pytest {posargs}
flake8 . --count --show-source --statistics
[testenv:pypy]
deps=
-r{toxinidir}/requirements.txt
-r{toxinidir}/dev-requirements.txt
commands = coverage run --parallel -m pytest {posargs}
commands =
coverage run --parallel -m pytest {posargs}
flake8 . --count --show-source --statistics
[testenv:pypy3]
deps=
-r{toxinidir}/requirements.txt
-r{toxinidir}/dev-requirements.txt
commands = coverage run --parallel -m pytest {posargs}
commands =
coverage run --parallel -m pytest {posargs}
flake8 . --count --show-source --statistics
[testenv:coverage-report]
deps = coverage