more unittesting + bugfixes

This commit is contained in:
Chris Caron 2017-12-10 21:28:00 -05:00
parent 6cca5946e1
commit 82c5a11e5b
22 changed files with 788 additions and 246 deletions

View File

@ -21,7 +21,7 @@ The table below identifies the services this tool supports and some example serv
| Notification Service | Service ID | Default Port | Example Syntax |
| -------------------- | ---------- | ------------ | -------------- |
| [Boxcar](https://github.com/caronc/apprise/wiki/Notify_boxcar) | boxcar:// | (TCP) 443 | boxcar://hostname<br />boxcar://hostname/@tag<br/>boxcar://hostname/device_token<br />boxcar://hostname/device_token1/device_token2/device_tokenN<br />boxcar://hostname/alias<br />boxcar://hostname/@tag/@tag2/alias/device_token
| [Boxcar](https://github.com/caronc/apprise/wiki/Notify_boxcar) | boxcar:// | (TCP) 443 | boxcar://hostname<br />boxcar://hostname/@tag<br/>boxcar://hostname/device_token<br />boxcar://hostname/device_token1/device_token2/device_tokenN<br />boxcar://hostname/@tag/@tag2/device_token
| [Faast](https://github.com/caronc/apprise/wiki/Notify_faast) | faast:// | (TCP) 443 | faast://authorizationtoken
| [Growl](https://github.com/caronc/apprise/wiki/Notify_growl) | growl:// | (UDP) 23053 | growl://hostname<br />growl://hostname:portno<br />growl://password@hostname<br />growl://password@hostname:port</br>_Note: you can also use the get parameter _version_ which can allow the growl request to behave using the older v1.x protocol. An example would look like: growl://hostname?version=1
| [Join](https://github.com/caronc/apprise/wiki/Notify_join) | join:// | (TCP) 443 | join://apikey/device<br />join://apikey/device1/device2/deviceN/<br />join://apikey/group<br />join://apikey/groupA/groupB/groupN<br />join://apikey/DeviceA/groupA/groupN/DeviceN/

View File

@ -238,10 +238,10 @@ class Apprise(object):
# Toggle our return status flag
status = False
except:
except Exception:
# A catch all so we don't have to abort early
# just because one of our plugins has a bug in it.
# TODO: print backtrace
logging.exception("notification exception")
status = False
return status

View File

@ -70,10 +70,10 @@ class AppriseAsset(object):
if theme:
self.theme = theme
if image_path_mask:
if image_path_mask is not None:
self.image_path_mask = image_path_mask
if image_url_mask:
if image_url_mask is not None:
self.image_url_mask = image_url_mask
def html_color(self, notify_type):
@ -89,6 +89,10 @@ class AppriseAsset(object):
Apply our mask to our image URL
"""
if not self.image_url_mask:
# No image to return
return None
re_map = {
'{THEME}': self.theme if self.theme else '',
'{TYPE}': notify_type,
@ -108,6 +112,11 @@ class AppriseAsset(object):
Apply our mask to our image file path
"""
if not self.image_path_mask:
# No image to return
return None
re_map = {
'{THEME}': self.theme if self.theme else '',
'{TYPE}': notify_type,

View File

@ -121,7 +121,7 @@ class NotifyBase(object):
def __init__(self, title_maxlen=100, body_maxlen=512,
notify_format=NotifyFormat.TEXT, image_size=None,
include_image=False, secure=False, throttle=None, **kwargs):
secure=False, throttle=None, **kwargs):
"""
Initialize some general logging and common server arguments that will
keep things consistent when working with the notifiers that will
@ -152,7 +152,6 @@ class NotifyBase(object):
self.title_maxlen = title_maxlen
self.body_maxlen = body_maxlen
self.image_size = image_size
self.include_image = include_image
self.secure = secure
if isinstance(throttle, (float, int)):
@ -256,6 +255,9 @@ class NotifyBase(object):
common unquote function
"""
if not content:
return ''
try:
# Python v3.x
return _unquote(content, encoding=encoding, errors=errors)
@ -270,6 +272,9 @@ class NotifyBase(object):
common quote function
"""
if not content:
return ''
try:
# Python v3.x
return _quote(content, safe=safe, encoding=encoding, errors=errors)
@ -292,7 +297,7 @@ class NotifyBase(object):
except TypeError:
# Python v2.7
return _urlencode(query, oseq=doseq)
return _urlencode(query)
@staticmethod
def split_path(path, unquote=True):
@ -322,12 +327,13 @@ class NotifyBase(object):
return is_hostname(hostname)
@staticmethod
def parse_url(url):
def parse_url(url, verify_host=True):
"""
Parses the URL and returns it broken apart into a dictionary.
"""
results = parse_url(url, default_schema='unknown')
results = parse_url(
url, default_schema='unknown', verify_host=verify_host)
if not results:
# We're done; we failed to parse our url

View File

@ -19,86 +19,137 @@
from json import dumps
import requests
import re
from time import time
import hmac
from hashlib import sha1
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse
from .NotifyBase import NotifyBase
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyImageSize
from ..utils import compat_is_basestring
# Used to validate Tags, Aliases and Devices
IS_TAG = re.compile(r'^[A-Za-z0-9]{1,63}$')
IS_ALIAS = re.compile(r'^[@]?[A-Za-z0-9]+$')
IS_DEVICETOKEN = re.compile(r'^[A-Za-z0-9]{64}$')
# Default to sending to all devices if nothing is specified
DEFAULT_TAG = '@all'
# The tags value is an structure containing an array of strings defining the
# list of tagged devices that the notification need to be send to, and a
# boolean operator (and / or) that defines the criteria to match devices
# against those tags.
IS_TAG = re.compile(r'^[@](?P<name>[A-Z0-9]{1,63})$', re.I)
# Device tokens are only referenced when developing.
# it's not likely you'll send a message directly to a device, but
# if you do; this plugin supports it.
IS_DEVICETOKEN = re.compile(r'^[A-Z0-9]{64}$', re.I)
# Both an access key and seret key are created and assigned to each project
# you create on the boxcar website
VALIDATE_ACCESS = re.compile(r'[A-Z0-9_-]{64}', re.I)
VALIDATE_SECRET = re.compile(r'[A-Z0-9_-]{64}', re.I)
# Used to break apart list of potential tags by their delimiter
# into a usable list.
TAGS_LIST_DELIM = re.compile(r'[ \t\r\n,\\/]+')
# Image Support (72x72)
BOXCAR_IMAGE_XY = NotifyImageSize.XY_72
class NotifyBoxcar(NotifyBase):
"""
A wrapper for Boxcar Notifications
"""
# The default simple (insecure) protocol
protocol = 'boxcar'
# All boxcar notifications are secure
secure_protocol = 'boxcar'
# The default secure protocol
secure_protocol = 'boxcars'
# Boxcar URL
notify_url = 'https://boxcar-api.io/api/push/'
def __init__(self, recipients=None, **kwargs):
def __init__(self, access, secret, recipients=None, **kwargs):
"""
Initialize Boxcar Object
"""
super(NotifyBoxcar, self).__init__(
title_maxlen=250, body_maxlen=10000, **kwargs)
if self.secure:
self.schema = 'https'
else:
self.schema = 'http'
title_maxlen=250, body_maxlen=10000,
image_size=BOXCAR_IMAGE_XY, **kwargs)
# Initialize tag list
self.tags = list()
# Initialize alias list
self.aliases = list()
# Initialize device_token list
self.device_tokens = list()
if recipients is None:
try:
# Access Key (associated with project)
self.access = access.strip()
except AttributeError:
self.logger.warning(
'The specified access key specified is invalid.',
)
raise TypeError(
'The specified access key specified is invalid.',
)
try:
# Secret Key (associated with project)
self.secret = secret.strip()
except AttributeError:
self.logger.warning(
'The specified secret key specified is invalid.',
)
raise TypeError(
'The specified secret key specified is invalid.',
)
if not VALIDATE_ACCESS.match(self.access):
self.logger.warning(
'The access key specified (%s) is invalid.' % self.access,
)
raise TypeError(
'The access key specified (%s) is invalid.' % self.access,
)
if not VALIDATE_SECRET.match(self.secret):
self.logger.warning(
'The secret key specified (%s) is invalid.' % self.secret,
)
raise TypeError(
'The secret key specified (%s) is invalid.' % self.secret,
)
if not recipients:
self.tags.append(DEFAULT_TAG)
recipients = []
elif compat_is_basestring(recipients):
recipients = filter(bool, TAGS_LIST_DELIM.split(
recipients = [x for x in filter(bool, TAGS_LIST_DELIM.split(
recipients,
))
elif not isinstance(recipients, (set, tuple, list)):
recipients = []
))]
# Validate recipients and drop bad ones:
for recipient in recipients:
if IS_DEVICETOKEN.match(recipient):
if IS_TAG.match(recipient):
# store valid tag/alias
self.tags.append(IS_TAG.match(recipient).group('name'))
elif IS_DEVICETOKEN.match(recipient):
# store valid device
self.device_tokens.append(recipient)
elif IS_TAG.match(recipient):
# store valid tag
self.tags.append(recipient)
elif IS_ALIAS.match(recipient):
# store valid tag/alias
self.aliases.append(recipient)
else:
self.logger.warning(
'Dropped invalid tag/alias/device_token '
'(%s) specified.' % recipient,
)
continue
def notify(self, title, body, notify_type, **kwargs):
"""
@ -112,62 +163,87 @@ class NotifyBoxcar(NotifyBase):
# prepare Boxcar Object
payload = {
'badge': 'auto',
'alert': '%s:\r\n%s' % (title, body),
'aps': {
'badge': 'auto',
'alert': '',
},
'expires': str(int(time() + 30)),
}
if self.tags:
payload['tags'] = self.tags
if title:
payload['aps']['@title'] = title
if self.aliases:
payload['aliases'] = self.aliases
if body:
payload['aps']['alert'] = body
if self.tags:
payload['tags'] = {'or': self.tags}
if self.device_tokens:
payload['device_tokens'] = self.device_tokens
auth = None
if self.user:
auth = (self.user, self.password)
# Source picture should be <= 450 DP wide, ~2:1 aspect.
image_url = self.image_url(notify_type)
if image_url:
# Set our image
payload['@img'] = image_url
url = '%s://%s' % (self.schema, self.host)
if isinstance(self.port, int):
url += ':%d' % self.port
# Acquire our hostname
host = urlparse(self.notify_url).hostname
url += '/api/push'
# Calculate signature.
str_to_sign = "%s\n%s\n%s\n%s" % (
"POST", host, "/api/push", dumps(payload))
h = hmac.new(
bytearray(self.secret, 'utf-8'),
bytearray(str_to_sign, 'utf-8'),
sha1,
)
params = self.urlencode({
"publishkey": self.access,
"signature": h.hexdigest(),
})
notify_url = '%s?%s' % (self.notify_url, params)
self.logger.debug('Boxcar POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
notify_url, self.verify_certificate,
))
self.logger.debug('Boxcar Payload: %s' % str(payload))
try:
r = requests.post(
url,
notify_url,
data=dumps(payload),
headers=headers,
auth=auth,
verify=self.verify_certificate,
)
if r.status_code != requests.codes.ok:
# Boxcar returns 201 (Created) when successful
if r.status_code != requests.codes.created:
try:
self.logger.warning(
'Failed to send Boxcar notification: '
'%s (error=%s).' % (
HTTP_ERROR_MAP[r.status_code],
r.status_code))
except KeyError:
self.logger.warning(
'Failed to send Boxcar notification '
'(error=%s).' % (
r.status_code))
# self.logger.debug('Response Details: %s' % r.raw.read())
# Return; we're done
return False
except requests.ConnectionError as e:
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending Boxcar '
'notification to %s.' % (
self.host))
'notification to %s.' % (host))
self.logger.debug('Socket Exception: %s' % str(e))
@ -182,21 +258,39 @@ class NotifyBoxcar(NotifyBase):
Parses the URL and returns it broken apart into a dictionary.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early
return None
# Acquire our recipients and include them in the response
# The first token is stored in the hostnamee
access = results['host']
# Now fetch the remaining tokens
try:
recipients = NotifyBase.unquote(results['fullpath'])
secret = NotifyBase.split_path(results['fullpath'])[0]
except (AttributeError, KeyError):
# no recipients detected
recipients = ''
except (AttributeError, IndexError):
# Force a bad value that will get caught in parsing later
secret = None
# Store our recipients
results['recipients'] = recipients
try:
recipients = ','.join(
NotifyBase.split_path(results['fullpath'])[1:])
except (AttributeError, IndexError):
# Default to not having any recipients
recipients = None
if not (access and secret):
# If we did not recive an access and/or secret code
# then we're done
return None
# Store our required content
results['recipients'] = recipients if recipients else None
results['access'] = access
results['secret'] = secret
return results

View File

@ -189,7 +189,7 @@ class NotifyEmail(NotifyBase):
# over-riding any smarts to be applied
return
for i in range(len(WEBBASE_LOOKUP_TABLE)):
for i in range(len(WEBBASE_LOOKUP_TABLE)): # pragma: no branch
self.logger.debug('Scanning %s against %s' % (
self.to_addr, WEBBASE_LOOKUP_TABLE[i][0]
))
@ -290,13 +290,10 @@ class NotifyEmail(NotifyBase):
# Return; we're done
return False
try:
finally:
# Gracefully terminate the connection with the server
socket.quit()
except:
# no problem
pass
return True
@staticmethod
@ -332,17 +329,11 @@ class NotifyEmail(NotifyBase):
else:
# get 'To' email address
try:
to_addr = '%s@%s' % (
re.split(
'[\s@]+', NotifyBase.unquote(results['user']))[0],
results.get('host', '')
)
except (AttributeError, IndexError):
# No problem, we have other ways of getting
# the To address
pass
to_addr = '%s@%s' % (
re.split(
'[\s@]+', NotifyBase.unquote(results['user']))[0],
results.get('host', '')
)
# Attempt to detect 'from' email address
from_addr = to_addr

View File

@ -64,12 +64,9 @@ class NotifyFaast(NotifyBase):
'message': body,
}
if self.include_image:
image_url = self.image_url(
notify_type,
)
if image_url:
payload['icon_url'] = image_url
image_url = self.image_url(notify_type)
if image_url:
payload['icon_url'] = image_url
self.logger.debug('Faast POST URL: %s (cert_verify=%r)' % (
self.notify_url, self.verify_certificate,

View File

@ -140,13 +140,13 @@ class NotifyGrowl(NotifyBase):
body = '\r\n'.join(body[0:2])
icon = None
if self.include_image:
if self.version >= 2:
# URL Based
icon = self.image_url(notify_type)
else:
# Raw
icon = self.image_raw(notify_type)
if self.version >= 2:
# URL Based
icon = self.image_url(notify_type)
else:
# Raw
icon = self.image_raw(notify_type)
payload = {
'noteType': GROWL_NOTIFICATION_TYPE,

View File

@ -113,8 +113,7 @@ class NotifyJSON(NotifyBase):
except KeyError:
self.logger.warning(
'Failed to send JSON notification '
'(error=%s).' % (
r.status_code))
'(error=%s).' % (r.status_code))
# Return; we're done
return False

View File

@ -148,12 +148,9 @@ class NotifyJoin(NotifyBase):
'text': body,
}
if self.include_image:
image_url = self.image_url(
notify_type,
)
if image_url:
url_args['icon'] = image_url
image_url = self.image_url(notify_type)
if image_url:
url_args['icon'] = image_url
# prepare payload
payload = {}

View File

@ -45,9 +45,6 @@ class NotifyPushBullet(NotifyBase):
A wrapper for PushBullet Notifications
"""
# The default protocol
protocol = 'pbul'
# The default secure protocol
secure_protocol = 'pbul'

View File

@ -92,12 +92,9 @@ class NotifyPushalot(NotifyBase):
'Source': self.app_id,
}
if self.include_image:
image_url = self.image_url(
notify_type,
)
if image_url:
payload['Image'] = image_url
image_url = self.image_url(notify_type)
if image_url:
payload['Image'] = image_url
self.logger.debug('Pushalot POST URL: %s (cert_verify=%r)' % (
self.notify_url, self.verify_certificate,

View File

@ -22,7 +22,7 @@
# follow the wizard to pre-determine the channel(s) you want your
# message to broadcast to, and when you're complete, you will
# recieve a URL that looks something like this:
# https://hooks.slack.com/services/T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7F
# https://hooks.slack.com/services/T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ
# ^ ^ ^
# | | |
# These are important <--------------^---------^---------------^
@ -59,11 +59,11 @@ SLACK_HTTP_ERROR_MAP.update({
401: 'Unauthorized - Invalid Token.',
})
# Used to break path apart into list of devices
# Used to break path apart into list of channels
CHANNEL_LIST_DELIM = re.compile(r'[ \t\r\n,#\\/]+')
# Used to detect a device
IS_CHANNEL_RE = re.compile(r'#?([A-Za-z0-9_]{1,32})')
# Used to detect a channel
IS_CHANNEL_RE = re.compile(r'[+#@]?([A-Z0-9_]{1,32})', re.I)
# Image Support (72x72)
SLACK_IMAGE_XY = NotifyImageSize.XY_72
@ -127,11 +127,13 @@ class NotifySlack(NotifyBase):
self.user = SLACK_DEFAULT_USER
if compat_is_basestring(channels):
self.channels = filter(bool, CHANNEL_LIST_DELIM.split(
self.channels = [x for x in filter(bool, CHANNEL_LIST_DELIM.split(
channels,
))
))]
elif isinstance(channels, (set, tuple, list)):
self.channels = channels
else:
self.channels = list()
@ -167,13 +169,13 @@ class NotifySlack(NotifyBase):
}
# error tracking (used for function return)
has_error = False
notify_okay = True
# Perform Formatting
title = self._re_formatting_rules.sub(
title = self._re_formatting_rules.sub( # pragma: no branch
lambda x: self._re_formatting_map[x.group()], title,
)
body = self._re_formatting_rules.sub(
body = self._re_formatting_rules.sub( # pragma: no branch
lambda x: self._re_formatting_map[x.group()], body,
)
url = '%s/%s/%s/%s' % (
@ -183,11 +185,7 @@ class NotifySlack(NotifyBase):
self.token_c,
)
image_url = None
if self.include_image:
image_url = self.image_url(
notify_type,
)
image_url = self.image_url(notify_type)
# Create a copy of the channel list
channels = list(self.channels)
@ -204,9 +202,11 @@ class NotifySlack(NotifyBase):
if len(channel) > 1 and channel[0] == '+':
# Treat as encoded id if prefixed with a +
_channel = channel[1:]
elif len(channel) > 1 and channel[0] == '@':
# Treat @ value 'as is'
_channel = channel
else:
# Prefix with channel hash tag
_channel = '#%s' % channel
@ -220,7 +220,7 @@ class NotifySlack(NotifyBase):
'attachments': [{
'title': title,
'text': body,
'color': self.asset.html_color[notify_type],
'color': self.asset.html_color(notify_type),
# Time
'ts': time(),
'footer': self.app_id,
@ -251,7 +251,7 @@ class NotifySlack(NotifyBase):
SLACK_HTTP_ERROR_MAP[r.status_code],
r.status_code))
except IndexError:
except KeyError:
self.logger.warning(
'Failed to send Slack:%s '
'notification (error=%s).' % (
@ -261,21 +261,21 @@ class NotifySlack(NotifyBase):
# self.logger.debug('Response Details: %s' % r.raw.read())
# Return; we're done
has_error = True
notify_okay = False
except requests.ConnectionError as e:
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending Slack:%s ' % (
channel) + 'notification.'
)
self.logger.debug('Socket Exception: %s' % str(e))
has_error = True
notify_okay = False
if len(channels):
# Prevent thrashing requests
self.throttle()
return has_error
return notify_okay
@staticmethod
def parse_url(url):
@ -297,23 +297,15 @@ class NotifySlack(NotifyBase):
# Now fetch the remaining tokens
try:
token_b, token_c = filter(
bool, NotifyBase.split_path(results['fullpath']))[0:2]
token_b, token_c = [x for x in filter(
bool, NotifyBase.split_path(results['fullpath']))][0:2]
except (AttributeError, IndexError):
# Force some bad values that will get caught
# in parsing later
token_b = None
token_c = None
except (ValueError, AttributeError, IndexError):
# We're done
return None
try:
channels = '#'.join(filter(
bool, NotifyBase.split_path(results['fullpath']))[2:])
except (AttributeError, IndexError):
# Force some bad values that will get caught
# in parsing later
channels = None
channels = [x for x in filter(
bool, NotifyBase.split_path(results['fullpath']))][2:]
results['token_a'] = token_a
results['token_b'] = token_b

View File

@ -218,18 +218,17 @@ class NotifyTelegram(NotifyBase):
has_error = False
image_url = None
if self.include_image:
image_content = self.image_raw(notify_type)
if image_content is not None:
# prepare our image URL
image_url = '%s%s/%s' % (
self.notify_url,
self.bot_token,
'sendPhoto'
)
image_content = self.image_raw(notify_type)
if image_content is not None:
# prepare our image URL
image_url = '%s%s/%s' % (
self.notify_url,
self.bot_token,
'sendPhoto'
)
# Set up our upload
files = {'photo': ('%s.png' % notify_type, image_content)}
# Set up our upload
files = {'photo': ('%s.png' % notify_type, image_content)}
url = '%s%s/%s' % (
self.notify_url,

View File

@ -90,12 +90,9 @@ class NotifyToasty(NotifyBase):
'text': NotifyBase.quote(body),
}
if self.include_image:
image_url = self.image_url(
notify_type,
)
if image_url:
payload['image'] = image_url
image_url = self.image_url(notify_type)
if image_url:
payload['image'] = image_url
# URL to transmit content via
url = '%s%s' % (self.notify_url, device)

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# XBMC Notify Wrapper
# XBMC/KODI Notify Wrapper
#
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
@ -28,17 +28,6 @@ from ..common import NotifyImageSize
# Image Support (128x128)
XBMC_IMAGE_XY = NotifyImageSize.XY_128
# XBMC uses v2
XBMC_PROTOCOL_V2 = 2
# Kodi uses v6
XBMC_PROTOCOL_V6 = 6
SUPPORTED_XBMC_PROTOCOLS = (
XBMC_PROTOCOL_V2,
XBMC_PROTOCOL_V6,
)
class NotifyXBMC(NotifyBase):
"""
@ -52,7 +41,13 @@ class NotifyXBMC(NotifyBase):
secure_protocol = ('xbmc', 'kodis')
# XBMC uses the http protocol with JSON requests
default_port = 8080
xbmc_default_port = 8080
# XBMC default protocol version (v2)
xbmc_remote_protocol = 2
# KODI default protocol version (v6)
kodi_remote_protocol = 6
def __init__(self, **kwargs):
"""
@ -68,14 +63,8 @@ class NotifyXBMC(NotifyBase):
else:
self.schema = 'http'
if not self.port:
self.port = self.default_port
self.protocol = kwargs.get('protocol', XBMC_PROTOCOL_V2)
if self.protocol not in SUPPORTED_XBMC_PROTOCOLS:
raise TypeError("Invalid protocol specified.")
return
# Default protocol
self.protocol = kwargs.get('protocol', self.xbmc_remote_protocol)
def _payload_60(self, title, body, notify_type, **kwargs):
"""
@ -102,18 +91,17 @@ class NotifyXBMC(NotifyBase):
'id': 1,
}
if self.include_image:
image_url = self.image_url(
notify_type,
)
if image_url:
payload['image'] = image_url
if notify_type is NotifyType.Error:
payload['type'] = 'error'
elif notify_type is NotifyType.Warning:
payload['type'] = 'warning'
else:
payload['type'] = 'info'
image_url = self.image_url(notify_type)
if image_url:
payload['image'] = image_url
if notify_type is NotifyType.FAILURE:
payload['type'] = 'error'
elif notify_type is NotifyType.WARNING:
payload['type'] = 'warning'
else:
payload['type'] = 'info'
return (headers, dumps(payload))
@ -142,18 +130,15 @@ class NotifyXBMC(NotifyBase):
'id': 1,
}
if self.include_image:
image_url = self.image_url(
notify_type,
)
if image_url:
payload['image'] = image_url
image_url = self.image_url(notify_type)
if image_url:
payload['image'] = image_url
return (headers, dumps(payload))
def notify(self, title, body, notify_type, **kwargs):
"""
Perform XBMC Notification
Perform XBMC/KODI Notification
"""
# Limit results to just the first 2 line otherwise
@ -162,13 +147,13 @@ class NotifyXBMC(NotifyBase):
body[0] = body[0].strip('#').strip()
body = '\r\n'.join(body[0:2])
if self.protocol == XBMC_PROTOCOL_V2:
if self.protocol == self.xbmc_remote_protocol:
# XBMC v2.0
(headers, payload) = self._payload_20(
title, body, notify_type, **kwargs)
else:
# XBMC v6.0
# KODI v6.0
(headers, payload) = self._payload_60(
title, body, notify_type, **kwargs)
@ -177,7 +162,7 @@ class NotifyXBMC(NotifyBase):
auth = (self.user, self.password)
url = '%s://%s' % (self.schema, self.host)
if isinstance(self.port, int):
if self.port:
url += ':%d' % self.port
url += '/jsonrpc'
@ -214,7 +199,7 @@ class NotifyXBMC(NotifyBase):
else:
self.logger.info('Sent XBMC/KODI notification.')
except requests.ConnectionError as e:
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending XBMC/KODI '
'notification.'
@ -225,3 +210,31 @@ class NotifyXBMC(NotifyBase):
return False
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early
return results
# We want to set our protocol depending on whether we're using XBMC
# or KODI
if results.get('schema', '').startswith('xbmc'):
# XBMC Support
results['protocol'] = NotifyXBMC.xbmc_remote_protocol
# Assign Default XBMC Port
if not results['port']:
results['port'] = NotifyXBMC.xbmc_default_port
else:
# KODI Support
results['protocol'] = NotifyXBMC.kodi_remote_protocol
return results

View File

@ -136,7 +136,7 @@ class NotifyXML(NotifyBase):
# Return; we're done
return False
except requests.ConnectionError as e:
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending XML '
'notification to %s.' % self.host)

View File

@ -133,7 +133,7 @@ def tidy_path(path):
return path
def parse_url(url, default_schema='http'):
def parse_url(url, default_schema='http', verify_host=True):
"""A function that greatly simplifies the parsing of a url
specified by the end user.
@ -293,7 +293,7 @@ def parse_url(url, default_schema='http'):
if result['port'] == 0:
result['port'] = None
if not is_hostname(result['host']):
if verify_host and not is_hostname(result['host']):
# Nothing more we can do without a hostname
return None

View File

@ -27,19 +27,11 @@ import click
import logging
import sys
from apprise import Apprise
from apprise import AppriseAsset
from apprise import NotifyType
import apprise
# Logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
logger = logging.getLogger('apprise.plugins.NotifyBase')
# Defines our click context settings adding -h to the additional options that
# can be specified to get the help menu to come up
@ -60,41 +52,58 @@ def print_help_msg(command):
help='Specify the message title.')
@click.option('--body', '-b', default=None, type=str,
help='Specify the message body.')
@click.option('--notification-type', '-t', default=NotifyType.INFO, type=str,
@click.option('--notification-type', '-n', default=NotifyType.INFO, type=str,
metavar='TYPE', help='Specify the message type (default=info).')
@click.option('--theme', '-T', default='default', type=str,
help='Specify the default theme.')
@click.option('-v', '--verbose', count=True)
@click.argument('urls', nargs=-1,
metavar='SERVER_URL [SERVER_URL2 [SERVER_URL3]]',)
def _main(title, body, urls, notification_type, theme):
def _main(title, body, urls, notification_type, theme, verbose):
"""
Send a notification to all of the specified servers identified by their
URLs the content provided within the title, body and notification-type.
"""
# Logging
ch = logging.StreamHandler(sys.stdout)
if verbose > 2:
logger.setLevel(logging.DEBUG)
elif verbose == 1:
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.NONE)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
if not urls:
logger.error('You must specify at least one server URL.')
print_help_msg(_main)
return 1
# Prepare our asset
asset = AppriseAsset(theme=theme)
asset = apprise.AppriseAsset(theme=theme)
# Create our object
apprise = Apprise(asset=asset)
a = apprise.Apprise(asset=asset)
# Load our inventory up
for url in urls:
apprise.add(url)
a.add(url)
if body is None:
# if no body was specified, then read from STDIN
body = click.get_text_stream('stdin').read()
# now print it out
apprise.notify(title=title, body=body, notify_type=notification_type)
return 0
if a.notify(title=title, body=body, notify_type=notification_type):
return 0
return 1
if __name__ == '__main__':

View File

@ -316,3 +316,13 @@ def test_apprise_asset(tmpdir):
# Restore our permissions
chmod(a.image_path(NotifyType.INFO, NotifyImageSize.XY_256), 0o640)
# Disable all image references
a = AppriseAsset(image_path_mask=False, image_url_mask=False)
# We always return none in these calls now
assert(a.image_raw(NotifyType.INFO, NotifyImageSize.XY_256) is None)
assert(a.image_url(NotifyType.INFO, NotifyImageSize.XY_256) is None)
assert(a.image_path(NotifyType.INFO, NotifyImageSize.XY_256,
must_exist=False) is None)
assert(a.image_path(NotifyType.INFO, NotifyImageSize.XY_256,
must_exist=True) is None)

View File

@ -78,7 +78,7 @@ def test_notify_base():
# Create an object with an ImageSize loaded into it
nb = NotifyBase(image_size=NotifyImageSize.XY_256)
# We'll get an object thi time around
# We'll get an object this time around
assert nb.image_url(notify_type=NotifyType.INFO) is not None
assert nb.image_path(notify_type=NotifyType.INFO) is not None
assert nb.image_raw(notify_type=NotifyType.INFO) is not None
@ -104,9 +104,13 @@ def test_notify_base():
'/path/?name=Dr%20Disrespect', unquote=True) == \
['path', '?name=Dr', 'Disrespect']
# Test is_email
assert NotifyBase.is_email('test@gmail.com') is True
assert NotifyBase.is_email('invalid.com') is False
# Test is_hostname
assert NotifyBase.is_hostname('example.com') is True
def test_notify_base_urls():
"""

View File

@ -19,11 +19,74 @@
from apprise import plugins
from apprise import NotifyType
from apprise import Apprise
from apprise import AppriseAsset
import requests
import mock
VALID_URLS = (
TEST_URLS = (
##################################
# NotifyBoxcar
##################################
('boxcar://', {
'instance': None,
}),
# No secret specified
('boxcar://%s' % ('a' * 64), {
'instance': None,
}),
# An invalid access and secret key specified
('boxcar://access.key/secret.key/', {
'instance': plugins.NotifyBoxcar,
# Thrown because there were no recipients specified
'exception': TypeError,
}),
# Provide both an access and a secret
('boxcar://%s/%s' % ('a' * 64, 'b' * 64), {
'instance': plugins.NotifyBoxcar,
'requests_response_code': requests.codes.created,
}),
# Test without image set
('boxcar://%s/%s' % ('a' * 64, 'b' * 64), {
'instance': plugins.NotifyBoxcar,
'requests_response_code': requests.codes.created,
# don't include an image by default
'include_image': False,
}),
# our access, secret and device are all 64 characters
# which is what we're doing here
('boxcar://%s/%s/@tag1/tag2///%s/' % (
'a' * 64, 'b' * 64, 'd' * 64), {
'instance': plugins.NotifyBoxcar,
'requests_response_code': requests.codes.created,
}),
# An invalid tag
('boxcar://%s/%s/@%s' % ('a' * 64, 'b' * 64, 't' * 64), {
'instance': plugins.NotifyBoxcar,
'requests_response_code': requests.codes.created,
}),
('boxcar://:@/', {
'instance': None,
}),
('boxcar://%s/%s/' % ('a' * 64, 'b' * 64), {
'instance': plugins.NotifyBoxcar,
# force a failure
'response': False,
'requests_response_code': requests.codes.internal_server_error,
}),
('boxcar://%s/%s/' % ('a' * 64, 'b' * 64), {
'instance': plugins.NotifyBoxcar,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('boxcar://%s/%s/' % ('a' * 64, 'b' * 64), {
'instance': plugins.NotifyBoxcar,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
##################################
# NotifyJSON
##################################
@ -64,7 +127,7 @@ VALID_URLS = (
'instance': plugins.NotifyJSON,
# force a failure
'response': False,
'requests_response_code': 500,
'requests_response_code': requests.codes.internal_server_error,
}),
('json://user:pass@localhost:8082', {
'instance': plugins.NotifyJSON,
@ -79,6 +142,76 @@ VALID_URLS = (
'test_requests_exceptions': True,
}),
##################################
# NotifyKODI
##################################
('kodi://', {
'instance': None,
}),
('kodis://', {
'instance': None,
}),
('kodi://localhost', {
'instance': plugins.NotifyXBMC,
}),
('kodi://user:pass@localhost', {
'instance': plugins.NotifyXBMC,
}),
('kodi://localhost:8080', {
'instance': plugins.NotifyXBMC,
}),
('kodi://user:pass@localhost:8080', {
'instance': plugins.NotifyXBMC,
}),
('kodis://localhost', {
'instance': plugins.NotifyXBMC,
}),
('kodis://user:pass@localhost', {
'instance': plugins.NotifyXBMC,
}),
('kodis://localhost:8080/path/', {
'instance': plugins.NotifyXBMC,
}),
('kodis://user:pass@localhost:8080', {
'instance': plugins.NotifyXBMC,
}),
('kodi://localhost', {
'instance': plugins.NotifyXBMC,
# Experement with different notification types
'notify_type': NotifyType.WARNING,
}),
('kodi://localhost', {
'instance': plugins.NotifyXBMC,
# Experement with different notification types
'notify_type': NotifyType.FAILURE,
}),
('kodis://localhost:443', {
'instance': plugins.NotifyXBMC,
# don't include an image by default
'include_image': False,
}),
('kodi://:@/', {
'instance': None,
}),
('kodi://user:pass@localhost:8081', {
'instance': plugins.NotifyXBMC,
# force a failure
'response': False,
'requests_response_code': requests.codes.internal_server_error,
}),
('kodi://user:pass@localhost:8082', {
'instance': plugins.NotifyXBMC,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('kodi://user:pass@localhost:8083', {
'instance': plugins.NotifyXBMC,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
##################################
# NotifyMatterMost
##################################
@ -123,7 +256,7 @@ VALID_URLS = (
'instance': plugins.NotifyMatterMost,
# force a failure
'response': False,
'requests_response_code': 500,
'requests_response_code': requests.codes.internal_server_error,
}),
('mmost://localhost/3ccdd113474722377935511fc85d3dd4', {
'instance': plugins.NotifyMatterMost,
@ -137,6 +270,184 @@ VALID_URLS = (
# is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
##################################
# NotifySlack
##################################
('slack://', {
'instance': None,
}),
('slack://:@/', {
'instance': None,
}),
('slack://T1JJ3T3L2', {
# Just Token 1 provided
'instance': None,
}),
('slack://T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ/#hmm/#-invalid-', {
# No username specified; this is still okay as we sub in
# default; The one invalid channel is skipped when sending a message
'instance': plugins.NotifySlack,
}),
('slack://T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ/#channel', {
# No username specified; this is still okay as we sub in
# default; The one invalid channel is skipped when sending a message
'instance': plugins.NotifySlack,
# don't include an image by default
'include_image': False,
}),
('slack://T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ/+id/%20/@id/', {
# + encoded id,
# @ userid
'instance': plugins.NotifySlack,
}),
('slack://username@T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ/#nuxref', {
'instance': plugins.NotifySlack,
}),
('slack://username@T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ', {
# Missing a channel
'exception': TypeError,
}),
('slack://username@INVALID/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ/#cool', {
# invalid 1st Token
'exception': TypeError,
}),
('slack://username@T1JJ3T3L2/INVALID/TIiajkdnlazkcOXrIdevi7FQ/#great', {
# invalid 2rd Token
'exception': TypeError,
}),
('slack://username@T1JJ3T3L2/A1BRTD4JD/INVALID/#channel', {
# invalid 3rd Token
'exception': TypeError,
}),
('slack://l2g@T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ/#usenet', {
'instance': plugins.NotifySlack,
# force a failure
'response': False,
'requests_response_code': requests.codes.internal_server_error,
}),
('slack://respect@T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ/#a', {
'instance': plugins.NotifySlack,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('slack://notify@T1JJ3T3L2/A1BRTD4JD/TIiajkdnlazkcOXrIdevi7FQ/#b', {
'instance': plugins.NotifySlack,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
##################################
# NotifyKODI
##################################
('xbmc://', {
'instance': None,
}),
('xbmc://localhost', {
'instance': plugins.NotifyXBMC,
}),
('xbmc://user:pass@localhost', {
'instance': plugins.NotifyXBMC,
}),
('xbmc://localhost:8080', {
'instance': plugins.NotifyXBMC,
}),
('xbmc://user:pass@localhost:8080', {
'instance': plugins.NotifyXBMC,
}),
('xbmc://localhost', {
'instance': plugins.NotifyXBMC,
# don't include an image by default
'include_image': False,
}),
('xbmc://localhost', {
'instance': plugins.NotifyXBMC,
# Experement with different notification types
'notify_type': NotifyType.WARNING,
}),
('xbmc://localhost', {
'instance': plugins.NotifyXBMC,
# Experement with different notification types
'notify_type': NotifyType.FAILURE,
}),
('xbmc://:@/', {
'instance': None,
}),
('xbmc://user:pass@localhost:8081', {
'instance': plugins.NotifyXBMC,
# force a failure
'response': False,
'requests_response_code': requests.codes.internal_server_error,
}),
('xbmc://user:pass@localhost:8082', {
'instance': plugins.NotifyXBMC,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('xbmc://user:pass@localhost:8083', {
'instance': plugins.NotifyXBMC,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
##################################
# NotifyXML
##################################
('xml://', {
'instance': None,
}),
('xmls://', {
'instance': None,
}),
('xml://localhost', {
'instance': plugins.NotifyXML,
}),
('xml://user:pass@localhost', {
'instance': plugins.NotifyXML,
}),
('xml://localhost:8080', {
'instance': plugins.NotifyXML,
}),
('xml://user:pass@localhost:8080', {
'instance': plugins.NotifyXML,
}),
('xmls://localhost', {
'instance': plugins.NotifyXML,
}),
('xmls://user:pass@localhost', {
'instance': plugins.NotifyXML,
}),
('xmls://localhost:8080/path/', {
'instance': plugins.NotifyXML,
}),
('xmls://user:pass@localhost:8080', {
'instance': plugins.NotifyXML,
}),
('xml://:@/', {
'instance': None,
}),
('xml://user:pass@localhost:8081', {
'instance': plugins.NotifyXML,
# force a failure
'response': False,
'requests_response_code': requests.codes.internal_server_error,
}),
('xml://user:pass@localhost:8082', {
'instance': plugins.NotifyXML,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('xml://user:pass@localhost:8083', {
'instance': plugins.NotifyXML,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
)
@ -149,7 +460,7 @@ def test_rest_plugins(mock_post, mock_get):
"""
# iterate over our dictionary and test it out
for (url, meta) in VALID_URLS:
for (url, meta) in TEST_URLS:
# Our expected instance
instance = meta.get('instance', None)
@ -166,7 +477,23 @@ def test_rest_plugins(mock_post, mock_get):
# Allow us to force the server response code to be something other then
# the defaults
requests_response_code = meta.get(
'requests_response_code', 200 if response else 404)
'requests_response_code',
requests.codes.ok if response else requests.codes.not_found,
)
# Allow notification type override, otherwise default to INFO
notify_type = meta.get('notify_type', NotifyType.INFO)
# Whether or not we should include an image with our request; unless
# otherwise specified, we assume that images are to be included
include_image = meta.get('include_image', True)
if include_image:
# a default asset
asset = AppriseAsset()
else:
# Disable images
asset = AppriseAsset(image_path_mask=False, image_url_mask=False)
test_requests_exceptions = meta.get(
'test_requests_exceptions', False)
@ -197,7 +524,8 @@ def test_rest_plugins(mock_post, mock_get):
)
try:
obj = Apprise.instantiate(url, suppress_exceptions=False)
obj = Apprise.instantiate(
url, asset=asset, suppress_exceptions=False)
assert(exception is None)
@ -224,7 +552,7 @@ def test_rest_plugins(mock_post, mock_get):
# check that we're as expected
assert obj.notify(
title='test', body='body',
notify_type=NotifyType.INFO) == response
notify_type=notify_type) == response
else:
for exception in test_requests_exceptions:
@ -255,7 +583,7 @@ def test_rest_plugins(mock_post, mock_get):
except AssertionError:
# Don't mess with these entries
print('%s / %s' % (url, str(e)))
print('%s AssertionError' % url)
raise
except Exception as e:
@ -263,3 +591,106 @@ def test_rest_plugins(mock_post, mock_get):
print('%s / %s' % (url, str(e)))
assert(exception is not None)
assert(isinstance(e, exception))
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_notify_boxcar_plugin(mock_post, mock_get):
"""
API: NotifyBoxcar() Extra Checks
"""
# Generate some generic message types
device = 'A' * 64
tag = '@B' * 63
access = '-' * 64
secret = '_' * 64
# Initializes the plugin with recipients set to None
plugins.NotifyBoxcar(access=access, secret=secret, recipients=None)
# Initializes the plugin with a valid access, but invalid access key
try:
plugins.NotifyBoxcar(access=None, secret=secret, recipients=None)
assert(False)
except TypeError:
# We should throw an exception for knowingly having an invalid
assert(True)
# Initializes the plugin with a valid access, but invalid secret key
try:
plugins.NotifyBoxcar(access=access, secret='invalid', recipients=None)
assert(False)
except TypeError:
# We should throw an exception for knowingly having an invalid key
assert(True)
# Initializes the plugin with a valid access, but invalid secret
try:
plugins.NotifyBoxcar(access=access, secret=None, recipients=None)
assert(False)
except TypeError:
# We should throw an exception for knowingly having an invalid
assert(True)
# Initializes the plugin with recipients list
# the below also tests our the variation of recipient types
plugins.NotifyBoxcar(
access=access, secret=secret, recipients=[device, tag])
mock_get.return_value = requests.Request()
mock_post.return_value = requests.Request()
mock_post.return_value.status_code = requests.codes.created
mock_get.return_value.status_code = requests.codes.created
# Test notifications without a body or a title
p = plugins.NotifyBoxcar(access=access, secret=secret, recipients=None)
p.notify(body=None, title=None, notify_type=NotifyType.INFO) is True
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_notify_slack_plugin(mock_post, mock_get):
"""
API: NotifySlack() Extra Checks
"""
# Initialize some generic (but valid) tokens
token_a = 'A' * 9
token_b = 'B' * 9
token_c = 'c' * 24
# Support strings
channels = 'chan1,#chan2,+id,@user,,,'
obj = plugins.NotifySlack(
token_a=token_a, token_b=token_b, token_c=token_c, channels=channels)
assert(len(obj.channels) == 4)
mock_get.return_value = requests.Request()
mock_post.return_value = requests.Request()
mock_post.return_value.status_code = requests.codes.ok
mock_get.return_value.status_code = requests.codes.ok
# Empty Channel list
try:
plugins.NotifySlack(
token_a=token_a, token_b=token_b, token_c=token_c,
channels=None)
assert(False)
except TypeError:
# we'll thrown because an empty list of channels was provided
assert(True)
# Test include_image
obj = plugins.NotifySlack(
token_a=token_a, token_b=token_b, token_c=token_c, channels=channels,
include_image=True)
# This call includes an image with it's payload:
assert obj.notify(title='title', body='body',
notify_type=NotifyType.INFO) is True