massive refactoring; pep8 enhancments refs #1

This commit is contained in:
Chris Caron 2017-11-28 23:14:51 -05:00
parent a8f57a8b6e
commit 88ea283b95
51 changed files with 1727 additions and 742 deletions

View File

@ -1,18 +1,43 @@
# -*- coding: utf-8 -*-
#
# Apprise Core
#
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
# apprise is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# apprise is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
import re
import logging
from .common import NotifyType
from .common import NOTIFY_TYPES
from .utils import parse_list
from .AppriseAsset import AppriseAsset
from . import plugins
from .Utils import parse_url
from .Utils import parse_list
from .Utils import parse_bool
logger = logging.getLogger(__name__)
# Build a list of supported plugins
SCHEMA_MAP = {}
# Used for attempting to acquire the schema if the URL can't be parsed.
GET_SCHEMA_RE = re.compile('\s*(?P<schema>[a-z0-9]+)://.*$', re.I)
# Load our Lookup Matrix
def __load_matrix():
@ -23,20 +48,33 @@ def __load_matrix():
"""
# to add it's mapping to our hash table
for entry in dir(plugins):
# Get our plugin
plugin = getattr(plugins, entry)
proto = getattr(plugin, 'PROTOCOL', None)
protos = getattr(plugin, 'SECURE_PROTOCOL', None)
if not proto:
# Must have at least PROTOCOL defined
continue
# Load protocol(s) if defined
proto = getattr(plugin, 'protocol', None)
if isinstance(proto, basestring):
if proto not in SCHEMA_MAP:
SCHEMA_MAP[proto] = plugin
if proto not in SCHEMA_MAP:
SCHEMA_MAP[proto] = plugin
elif isinstance(proto, (set, list, tuple)):
# Support iterables list types
for p in proto:
if p not in SCHEMA_MAP:
SCHEMA_MAP[p] = plugin
if protos and protos not in SCHEMA_MAP:
SCHEMA_MAP[protos] = plugin
# Load secure protocol(s) if defined
protos = getattr(plugin, 'secure_protocol', None)
if isinstance(protos, basestring):
if protos not in SCHEMA_MAP:
SCHEMA_MAP[protos] = plugin
if isinstance(protos, (set, list, tuple)):
# Support iterables list types
for p in protos:
if p not in SCHEMA_MAP:
SCHEMA_MAP[p] = plugin
# Dynamically build our module
@ -48,25 +86,40 @@ class Apprise(object):
Our Notification Manager
"""
def __init__(self, servers=None):
def __init__(self, servers=None, asset=None):
"""
Loads a set of server urls
Loads a set of server urls while applying the Asset() module to each
if specified.
If no asset is provided, then the default asset is used.
"""
# Initialize a server list of URLs
self.servers = list()
# Assigns an central asset object that will be later passed into each
# notification plugin. Assets contain information such as the local
# directory images can be found in. It can also identify remote
# URL paths that contain the images you want to present to the end
# user. If no asset is specified, then the default one is used.
self.asset = asset
if asset is None:
# Load our default configuration
self.asset = AppriseAsset()
if servers:
self.add(servers)
def add(self, servers, include_image=True, image_url=None,
image_path=None):
def add(self, servers, asset=None):
"""
Adds one or more server URLs into our list.
"""
# Initialize our return status
return_status = True
servers = parse_list(servers)
for _server in servers:
@ -75,87 +128,58 @@ class Apprise(object):
# pushbullet)
_server = _server.replace('/#', '/%23')
# Parse our url details
# the server object is a dictionary containing all of the
# information parsed from our URL
server = parse_url(_server, default_schema='unknown')
# Initialize our return status
return_status = True
if not server:
# This is a dirty hack; but it's the only work around to
# tgram:// messages since the bot_token has a colon in it.
# It invalidates an normal URL.
# This hack searches for this bogus URL and corrects it
# so we can properly load it further down. The other
# alternative is to ask users to actually change the colon
# into a slash (which will work too), but it's more likely
# to cause confusion... So this is the next best thing
tgram = re.match(
r'(?P<protocol>%s://)(bot)?(?P<prefix>([a-z0-9_-]+)'
r'(:[a-z0-9_-]+)?@)?(?P<btoken_a>[0-9]+):+'
r'(?P<remaining>.*)$' % 'tgram',
_server, re.I)
if tgram:
if tgram.group('prefix'):
server = self.parse_url('%s%s%s/%s' % (
tgram.group('protocol'),
tgram.group('prefix'),
tgram.group('btoken_a'),
tgram.group('remaining'),
),
default_schema='unknown',
)
else:
server = self.parse_url('%s%s/%s' % (
tgram.group('protocol'),
tgram.group('btoken_a'),
tgram.group('remaining'),
),
default_schema='unknown',
)
if not server:
# Failed to parse te server
self.logger.error('Could not parse URL: %s' % server)
return_status = False
continue
# Some basic validation
if server['schema'] not in SCHEMA_MAP:
self.logger.error(
'%s is not a supported server type.' %
server['schema'].upper(),
# Attempt to acquire the schema at the very least to allow
# our plugins to determine if they can make a better
# interpretation of a URL geared for them anyway.
schema = GET_SCHEMA_RE.match(_server)
if schema is None:
logger.error(
'%s is an unparseable server url.' % _server,
)
return_status = False
continue
notify_args = server.copy().items() + {
# Logger Details
'logger': self.logger,
# Base
'include_image': include_image,
'secure': (server['schema'][-1] == 's'),
# Support SSL Certificate 'verify' keyword
# Default to being enabled (True)
'verify': parse_bool(server['qsd'].get('verify', True)),
# Overrides
'override_image_url': image_url,
'override_image_path': image_path,
}.items()
# Update the schema
schema = schema.group('schema').lower()
# Grant our plugin access to manipulate the dictionary
if not SCHEMA_MAP[server['schema']].pre_parse(notify_args):
# Some basic validation
if schema not in SCHEMA_MAP:
logger.error(
'%s is not a supported server type.' % schema,
)
return_status = False
continue
# Parse our url details
# the server object is a dictionary containing all of the
# information parsed from our URL
results = SCHEMA_MAP[schema].parse_url(_server)
if not results:
# Failed to parse the server URL
logger.error('Could not parse URL: %s' % _server)
return_status = False
continue
try:
# Attempt to create an instance of our plugin using the parsed
# URL information
plugin = SCHEMA_MAP[results['schema']](**results)
except:
# the arguments are invalid or can not be used.
return_status = False
continue
# Add our entry to our list as it can be actioned at this point
self.servers.add(notify_args)
# Save our asset
if asset:
plugin.asset = asset
else:
plugin.asset = self.asset
# Add our initialized plugin to our server listings
self.servers.append(plugin)
# Return our status
return return_status
@ -167,9 +191,38 @@ class Apprise(object):
"""
self.servers.clear()
def notify(self, title='', body=''):
def notify(self, title, body, notify_type=NotifyType.SUCCESS, **kwargs):
"""
This should be over-rided by the class that inherits this one.
"""
Notifies all loaded servers using the content provided.
"""
# TODO: iterate over server entries and execute notification
# Initialize our return result
status = len(self.servers) > 0
if notify_type and notify_type not in NOTIFY_TYPES:
self.warning(
'An invalid notification type (%s) was specified.' % (
notify_type))
if not isinstance(body, basestring):
body = ''
if not isinstance(title, basestring):
title = ''
# Iterate over our loaded plugins
for server in self.servers:
try:
# Send notification
if not server.notify(title=title, body=body):
# Toggle our return status flag
status = False
except:
# A catch all so we don't have to abort early
# just because one of our plugins has a bug in it.
# TODO: print backtrace
status = False
return status

146
apprise/AppriseAsset.py Normal file
View File

@ -0,0 +1,146 @@
# -*- coding: utf-8 -*-
#
# Apprise Asset
#
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
# apprise is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# apprise is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
import re
from os.path import join
from os.path import dirname
from os.path import isfile
from os.path import abspath
from .common import NotifyType
class AppriseAsset(object):
"""
Provides a supplimentary class that can be used to provide extra
information and details that can be used by Apprise such as providing
an alternate location to where images/icons can be found and the
URL masks.
"""
# A Simple Mapping of Colors; For every NOTIFY_TYPE identified,
# there should be a mapping to it's color here:
html_notify_map = {
NotifyType.INFO: '#3AA3E3',
NotifyType.SUCCESS: '#3AA337',
NotifyType.FAILURE: '#A32037',
NotifyType.WARNING: '#CACF29',
}
# The default theme
theme = 'default'
# Image URL Mask
image_url_mask = \
'http://nuxref.com/apprise/themes/{THEME}/apprise-{TYPE}-{XY}.png'
# Image Path Mask
image_path_mask = abspath(join(
dirname(__file__),
'assets',
'themes',
'{THEME}',
'apprise-{TYPE}-{XY}.png',
))
def __init__(self, image_path_mask=None, image_url_mask=None, theme=None):
"""
Asset Initialization
"""
if theme:
self.theme = theme
if image_path_mask:
self.image_path_mask = image_path_mask
if image_url_mask:
self.image_url_mask = image_url_mask
def html_color(self, notify_type):
"""
Returns an HTML mapped color based on passed in notify type
"""
# Attempt to get the type, otherwise return a default grey
# if we couldn't look up the entry
return self.html_notify_map.get(notify_type, '#888888')
def image_url(self, notify_type, image_size):
"""
Apply our mask to our image URL
"""
re_map = {
'{THEME}': self.theme,
'{TYPE}': notify_type,
'{XY}': image_size,
}
# Iterate over above list and store content accordingly
re_table = re.compile(
r'(' + '|'.join(re_map.keys()) + r')',
re.IGNORECASE,
)
return re_table.sub(lambda x: re_map[x.group()], self.image_url_mask)
def image_path(self, notify_type, image_size, must_exist=True):
"""
Apply our mask to our image file path
"""
re_map = {
'{THEME}': self.theme,
'{TYPE}': notify_type,
'{XY}': image_size,
}
# Iterate over above list and store content accordingly
re_table = re.compile(
r'(' + '|'.join(re_map.keys()) + r')',
re.IGNORECASE,
)
# Acquire our path
path = re_table.sub(lambda x: re_map[x.group()], self.image_path_mask)
if must_exist and not isfile(path):
return None
# Return what we parsed
return path
def image_raw(self, notify_type, image_size):
"""
Returns the raw image if it can (otherwise the function returns None)
"""
path = self.image_path(notify_type=notify_type, image_size=image_size)
if path:
try:
with open(path, 'rb') as fd:
return fd.read()
except (OSError, IOError):
# We can't access the file
pass
return None

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# Supported Push Notifications Libraries
# base class for easier library inclusion
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -19,12 +19,23 @@
# You should have received a copy of the GNU General Public License
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
from .common import NotifyType
from .common import NOTIFY_TYPES
from .common import NOTIFY_IMAGE_SIZES
from .common import NotifyImageSize
from .plugins.NotifyBase import NotifyFormat
from .Apprise import Apprise
from .AppriseAsset import AppriseAsset
__version__ = '0.0.1'
__author__ = 'Chris Caron <lead2gold@gmail.com>'
__all__ = [
# Core
'Apprise',
'Apprise', 'AppriseAsset',
# Reference
'NotifyType', 'NotifyImageSize', 'NotifyFormat', 'NOTIFY_TYPES',
'NOTIFY_IMAGE_SIZES',
]

View File

Before

Width:  |  Height:  |  Size: 16 KiB

After

Width:  |  Height:  |  Size: 16 KiB

View File

Before

Width:  |  Height:  |  Size: 41 KiB

After

Width:  |  Height:  |  Size: 41 KiB

View File

Before

Width:  |  Height:  |  Size: 7.5 KiB

After

Width:  |  Height:  |  Size: 7.5 KiB

View File

Before

Width:  |  Height:  |  Size: 16 KiB

After

Width:  |  Height:  |  Size: 16 KiB

View File

Before

Width:  |  Height:  |  Size: 42 KiB

After

Width:  |  Height:  |  Size: 42 KiB

View File

Before

Width:  |  Height:  |  Size: 7.7 KiB

After

Width:  |  Height:  |  Size: 7.7 KiB

View File

Before

Width:  |  Height:  |  Size: 17 KiB

After

Width:  |  Height:  |  Size: 17 KiB

View File

Before

Width:  |  Height:  |  Size: 47 KiB

After

Width:  |  Height:  |  Size: 47 KiB

View File

Before

Width:  |  Height:  |  Size: 7.8 KiB

After

Width:  |  Height:  |  Size: 7.8 KiB

View File

Before

Width:  |  Height:  |  Size: 16 KiB

After

Width:  |  Height:  |  Size: 16 KiB

View File

Before

Width:  |  Height:  |  Size: 42 KiB

After

Width:  |  Height:  |  Size: 42 KiB

View File

Before

Width:  |  Height:  |  Size: 7.7 KiB

After

Width:  |  Height:  |  Size: 7.7 KiB

56
apprise/common.py Normal file
View File

@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
#
# Base Notify Wrapper
#
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
# apprise is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# apprise is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
class NotifyType(object):
"""
A simple mapping of notification types most commonly used with
all types of logging and notification services.
"""
INFO = 'info'
SUCCESS = 'success'
FAILURE = 'failure'
WARNING = 'warning'
NOTIFY_TYPES = (
NotifyType.INFO,
NotifyType.SUCCESS,
NotifyType.FAILURE,
NotifyType.WARNING,
)
class NotifyImageSize(object):
"""
A list of pre-defined image sizes to make it easier to work with defined
plugins.
"""
XY_72 = '72x72'
XY_128 = '128x128'
XY_256 = '256x256'
NOTIFY_IMAGE_SIZES = (
NotifyImageSize.XY_72,
NotifyImageSize.XY_128,
NotifyImageSize.XY_256,
)

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# Base Notify Wrapper
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -19,20 +19,22 @@
# You should have received a copy of the GNU General Public License
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
from time import sleep
import re
import markdown
import logging
from os.path import join
from os.path import dirname
from os.path import abspath
from time import sleep
from urllib import unquote as _unquote
# For conversion
from chardet import detect as chardet_detect
from ..utils import parse_url
from ..utils import parse_bool
from ..common import NOTIFY_IMAGE_SIZES
from ..common import NOTIFY_TYPES
from ..AppriseAsset import AppriseAsset
# Define a general HTML Escaping
try:
# use sax first because it's faster
@ -55,46 +57,6 @@ except ImportError:
return cgi_escape(text, quote=True)
class NotifyType(object):
INFO = 'info'
SUCCESS = 'success'
FAILURE = 'failure'
WARNING = 'warning'
# Most Servers do not like more then 1 request per 5 seconds,
# so 5.5 gives us a safe play range...
NOTIFY_THROTTLE_SEC = 5.5
NOTIFY_TYPES = (
NotifyType.INFO,
NotifyType.SUCCESS,
NotifyType.FAILURE,
NotifyType.WARNING,
)
# A Simple Mapping of Colors; For every NOTIFY_TYPE identified,
# there should be a mapping to it's color here:
HTML_NOTIFY_MAP = {
NotifyType.INFO: '#3AA3E3',
NotifyType.SUCCESS: '#3AA337',
NotifyType.FAILURE: '#A32037',
NotifyType.WARNING: '#CACF29',
}
class NotifyImageSize(object):
XY_72 = '72x72'
XY_128 = '128x128'
XY_256 = '256x256'
NOTIFY_IMAGE_SIZES = (
NotifyImageSize.XY_72,
NotifyImageSize.XY_128,
NotifyImageSize.XY_256,
)
HTTP_ERROR_MAP = {
400: 'Bad Request - Unsupported Parameters.',
401: 'Verification Failed.',
@ -104,32 +66,23 @@ HTTP_ERROR_MAP = {
503: 'Servers are overloaded.',
}
# Application Identifier
NOTIFY_APPLICATION_ID = 'apprise'
NOTIFY_APPLICATION_DESC = 'Apprise Notifications'
# Image Control
NOTIFY_IMAGE_URL = \
'http://nuxref.com/apprise/apprise-{TYPE}-{XY}.png'
NOTIFY_IMAGE_FILE = abspath(join(
dirname(__file__),
'var',
'apprise-{TYPE}-{XY}.png',
))
# HTML New Line Delimiter
NOTIFY_NEWLINE = '\r\n'
NOTIFY_NEWLINE = '\n'
# Used to break a path list into parts
PATHSPLIT_LIST_DELIM = re.compile(r'[ \t\r\n,\\/]+')
class NotifyFormat(object):
TEXT = 'text'
HTML = 'html'
MARKDOWN = 'markdown'
NOTIFY_FORMATS = (
NotifyFormat.TEXT,
NotifyFormat.HTML,
NotifyFormat.MARKDOWN,
)
# Regular expression retrieved from:
@ -152,26 +105,36 @@ class NotifyBase(object):
# The default simple (insecure) protocol
# all inheriting entries must provide their protocol lookup
# protocol:// (in this example they would specify 'protocol')
PROTOCOL = ''
protocol = ''
# The default secure protocol
# all inheriting entries must provide their protocol lookup
# protocols:// (in this example they would specify 'protocols')
# This value can be the same as the defined PROTOCOL.
SECURE_PROTOCOL = ''
# This value can be the same as the defined protocol.
secure_protocol = ''
# our Application identifier
app_id = 'Apprise'
# our Application description
app_desc = 'Apprise Notifications'
# Most Servers do not like more then 1 request per 5 seconds, so 5.5 gives
# us a safe play range...
throttle_attempt = 5.5
# Logging
logger = logging.getLogger(__name__)
def __init__(self, title_maxlen=100, body_maxlen=512,
notify_format=NotifyFormat.TEXT, image_size=None,
include_image=False, override_image_path=None,
secure=False, **kwargs):
"""
Initialize some general logging and common server arguments
that will keep things consistent when working with the
notifiers that will inherit this class
include_image=False, secure=False, throttle=None, **kwargs):
"""
Initialize some general logging and common server arguments that will
keep things consistent when working with the notifiers that will
inherit this class.
# Logging
self.logger = logging.getLogger(__name__)
"""
if notify_format.lower() not in NOTIFY_FORMATS:
self.logger.error(
@ -189,8 +152,8 @@ class NotifyBase(object):
'Invalid image size %s' % image_size,
)
self.app_id = NOTIFY_APPLICATION_ID
self.app_desc = NOTIFY_APPLICATION_DESC
# Prepare our Assets
self.asset = AppriseAsset()
self.notify_format = notify_format.lower()
self.title_maxlen = title_maxlen
@ -199,6 +162,10 @@ class NotifyBase(object):
self.include_image = include_image
self.secure = secure
if throttle:
# Custom throttle override
self.throttle_attempt = throttle
# Certificate Verification (for SSL calls); default to being enabled
self.verify_certificate = kwargs.get('verify', True)
@ -213,16 +180,19 @@ class NotifyBase(object):
self.user = kwargs.get('user')
self.password = kwargs.get('password')
# Over-rides
self.override_image_url = kwargs.get('override_image_url')
self.override_image_path = kwargs.get('override_image_path')
def throttle(self, throttle_time=NOTIFY_THROTTLE_SEC):
def throttle(self, throttle_time=None):
"""
A common throttle control
"""
self.logger.debug('Throttling...')
sleep(throttle_time)
throttle_time = throttle_time \
if throttle_time is not None else self.throttle_attempt
# Perform throttle
if throttle_time > 0:
sleep(throttle_time)
return
def image_url(self, notify_type):
@ -230,73 +200,46 @@ class NotifyBase(object):
Returns Image URL if possible
"""
if self.override_image_url:
# Over-ride
return self.override_image_url
if not self.image_size:
return None
if notify_type not in NOTIFY_TYPES:
return None
re_map = {
'{TYPE}': notify_type,
'{XY}': self.image_size,
}
# Iterate over above list and store content accordingly
re_table = re.compile(
r'(' + '|'.join(re_map.keys()) + r')',
re.IGNORECASE,
return self.asset.image_url(
notify_type=notify_type,
image_size=self.image_size,
)
return re_table.sub(lambda x: re_map[x.group()], NOTIFY_IMAGE_URL)
def image_path(self, notify_type):
"""
Returns the path of the image if it can
"""
if not self.image_size:
return None
if notify_type not in NOTIFY_TYPES:
return None
return self.asset.image_path(
notify_type=notify_type,
image_size=self.image_size,
)
def image_raw(self, notify_type):
"""
Returns the raw image if it can
"""
if not self.override_image_path:
if not self.image_size:
return None
if notify_type not in NOTIFY_TYPES:
return None
re_map = {
'{TYPE}': notify_type,
'{XY}': self.image_size,
}
# Iterate over above list and store content accordingly
re_table = re.compile(
r'(' + '|'.join(re_map.keys()) + r')',
re.IGNORECASE,
)
# Now we open and return the file
_file = re_table.sub(
lambda x: re_map[x.group()], NOTIFY_IMAGE_FILE)
else:
# Override Path Specified
_file = self.override_image_path
try:
fd = open(_file, 'rb')
except:
if not self.image_size:
return None
try:
return fd.read()
except:
if notify_type not in NOTIFY_TYPES:
return None
finally:
fd.close()
return self.asset.image_raw(
notify_type=notify_type,
image_size=self.image_size,
)
def escape_html(self, html, convert_new_lines=False):
"""
@ -379,67 +322,55 @@ class NotifyBase(object):
# we always return a list
return [html, ]
def notify(self, title, body, notify_type=NotifyType.SUCCESS,
**kwargs):
@staticmethod
def split_path(path, unquote=True):
"""
This should be over-rided by the class that
inherits this one.
"""
if notify_type and notify_type not in NOTIFY_TYPES:
self.warning(
'An invalid notification type (%s) was specified.' % (
notify_type))
if not isinstance(body, basestring):
body = ''
if not isinstance(title, basestring):
title = ''
# Ensure we're set up as UTF-8
title = self.to_utf8(title)
body = self.to_utf8(body)
if title:
title = title[0:self.title_maxlen]
if self.notify_format == NotifyFormat.HTML:
bodies = self.to_html(body=body)
elif self.notify_format == NotifyFormat.TEXT:
# TODO: this should split the content into
# multiple messages
bodies = [body[0:self.body_maxlen], ]
while len(bodies):
b = bodies.pop(0)
# Send Message(s)
if not self._notify(
title=title, body=b,
notify_type=notify_type,
**kwargs):
return False
# If we got here, we sent part of the notification
# if there are any left, we should throttle so we
# don't overload the server with requests (they
# might not be happy with us otherwise)
if len(bodies):
self.throttle()
return True
def pre_parse(self, url, server_settings):
"""
grants the ability to manipulate or additionally parse the content
provided in the server_settings variable.
Return True if you're satisfied with them (and may have additionally
changed them) and False if the settings are not acceptable or useable
Since this is the base class, plugins are not requird to overload it
but have the option to. By default the configuration is always
accepted.
Splits a URL up into a list object.
"""
return True
if unquote:
return PATHSPLIT_LIST_DELIM.split(_unquote(path).lstrip('/'))
return PATHSPLIT_LIST_DELIM.split(path.lstrip('/'))
@staticmethod
def is_email(address):
"""
Returns True if specified entry is an email address
"""
return IS_EMAIL_RE.match(address) is not None
@staticmethod
def parse_url(url):
"""
Parses the URL and returns it broken apart into a dictionary.
"""
results = parse_url(url, default_schema='unknown')
if not results:
# We're done; we failed to parse our url
return results
# if our URL ends with an 's', then assueme our secure flag is set.
results['secure'] = (results['schema'][-1] == 's')
# Our default notification format
results['notify_format'] = NotifyFormat.TEXT
# Support SSL Certificate 'verify' keyword. Default to being enabled
results['verify'] = True
if 'qsd' in results:
if 'verify' in results['qsd']:
parse_bool(results['qsd'].get('verify', True))
# Password overrides
if 'pass' in results['qsd']:
results['password'] = results['qsd']['pass']
# User overrides
if 'user' in results['qsd']:
results['user'] = results['qsd']['user']
return results

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# Boxcar Notify Wrapper
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -20,15 +20,11 @@
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
from json import dumps
from urllib import unquote
import requests
import re
# Used to break apart list of potential tags by their delimiter
# into a usable list.
TAGS_LIST_DELIM = re.compile(r'[ \t\r\n,\\/]+')
from .NotifyBase import NotifyBase
from .NotifyBase import NotifyFormat
from .NotifyBase import HTTP_ERROR_MAP
# Used to validate Tags, Aliases and Devices
@ -36,6 +32,10 @@ IS_TAG = re.compile(r'^[A-Za-z0-9]{1,63}$')
IS_ALIAS = re.compile(r'^[@]?[A-Za-z0-9]+$')
IS_DEVICETOKEN = re.compile(r'^[A-Za-z0-9]{64}$')
# Used to break apart list of potential tags by their delimiter
# into a usable list.
TAGS_LIST_DELIM = re.compile(r'[ \t\r\n,\\/]+')
class NotifyBoxcar(NotifyBase):
"""
@ -43,29 +43,30 @@ class NotifyBoxcar(NotifyBase):
"""
# The default simple (insecure) protocol
PROTOCOL = 'boxcar'
protocol = 'boxcar'
# The default secure protocol
SECURE_PROTOCOL = 'boxcars'
secure_protocol = 'boxcars'
def __init__(self, recipients=None, **kwargs):
"""
Initialize Boxcar Object
"""
super(NotifyBoxcar, self).__init__(
title_maxlen=250, body_maxlen=10000,
notify_format=NotifyFormat.TEXT,
**kwargs)
title_maxlen=250, body_maxlen=10000, **kwargs)
if self.secure:
self.schema = 'https'
else:
self.schema = 'http'
# Initialize tag list
self.tags = list()
# Initialize alias list
self.aliases = list()
# Initialize device_token list
self.device_tokens = list()
@ -101,7 +102,7 @@ class NotifyBoxcar(NotifyBase):
)
continue
def _notify(self, title, body, notify_type, **kwargs):
def notify(self, title, body, notify_type, **kwargs):
"""
Perform Boxcar Notification
"""
@ -176,3 +177,28 @@ class NotifyBoxcar(NotifyBase):
return False
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns it broken apart into a dictionary.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early
return None
# Acquire our recipients and include them in the response
try:
recipients = unquote(results['fullpath'])
except (AttributeError, KeyError):
# no recipients detected
recipients = ''
# Store our recipients
results['recipients'] = recipients
return results

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# Email Notify Wrapper
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -26,20 +26,12 @@ from smtplib import SMTP
from smtplib import SMTPException
from socket import error as SocketError
from urllib import unquote as unquote
from email.mime.text import MIMEText
from .NotifyBase import NotifyBase
from .NotifyBase import NotifyFormat
from .NotifyBase import IS_EMAIL_RE
# Default Non-Encryption Port
EMAIL_SMTP_PORT = 25
# Default Secure Port
EMAIL_SMTPS_PORT = 587
# Default SMTP Timeout (in seconds)
SMTP_SERVER_TIMEOUT = 30
class WebBaseLogin(object):
@ -49,15 +41,14 @@ class WebBaseLogin(object):
"""
# User Login must be Email Based
EMAIL = 'Email'
# User Login must UserID Based
USERID = 'UserID'
# To attempt to make this script stupid proof,
# if we detect an email address that is part of the
# this table, we can pre-use a lot more defaults if
# they aren't otherwise specified on the users
# input
# To attempt to make this script stupid proof, if we detect an email address
# that is part of the this table, we can pre-use a lot more defaults if they
# aren't otherwise specified on the users input.
WEBBASE_LOOKUP_TABLE = (
# Google GMail
(
@ -121,11 +112,6 @@ WEBBASE_LOOKUP_TABLE = (
),
)
# Mail Prefix Servers (TODO)
MAIL_SERVER_PREFIXES = (
'smtp', 'mail', 'smtps', 'outgoing'
)
class NotifyEmail(NotifyBase):
"""
@ -134,10 +120,19 @@ class NotifyEmail(NotifyBase):
"""
# The default simple (insecure) protocol
PROTOCOL = 'mailto'
protocol = 'mailto'
# The default secure protocol
SECURE_PROTOCOL = 'mailtos'
secure_protocol = 'mailtos'
# Default Non-Encryption Port
default_port = 25
# Default Secure Port
default_secure_port = 587
# Default SMTP Timeout (in seconds)
connect_timeout = 15
def __init__(self, to, notify_format, **kwargs):
"""
@ -154,15 +149,17 @@ class NotifyEmail(NotifyBase):
# Handle SMTP vs SMTPS (Secure vs UnSecure)
if not self.port:
if self.secure:
self.port = EMAIL_SMTPS_PORT
self.port = self.default_secure_port
else:
self.port = EMAIL_SMTP_PORT
self.port = self.default_port
# Email SMTP Server Timeout
try:
self.timeout = int(kwargs.get('timeout', SMTP_SERVER_TIMEOUT))
self.timeout = int(kwargs.get('timeout', self.connect_timeout))
except (ValueError, TypeError):
self.timeout = SMTP_SERVER_TIMEOUT
self.timeout = self.connect_timeout
# Now we want to construct the To and From email
# addresses from the URL provided
@ -175,13 +172,13 @@ class NotifyEmail(NotifyBase):
if not isinstance(self.to_addr, basestring):
raise TypeError('No valid ~To~ email address specified.')
if not IS_EMAIL_RE.match(self.to_addr):
if not NotifyBase.is_email(self.to_addr):
raise TypeError('Invalid ~To~ email format: %s' % self.to_addr)
if not isinstance(self.from_addr, basestring):
raise TypeError('No valid ~From~ email address specified.')
match = IS_EMAIL_RE.match(self.from_addr)
match = NotifyBase.is_email(self.from_addr)
if not match:
# Parse Source domain based on from_addr
raise TypeError('Invalid ~From~ email format: %s' % self.to_addr)
@ -202,10 +199,9 @@ class NotifyEmail(NotifyBase):
"""
if self.smtp_host:
# SMTP Server was explicitly specified, therefore it
# is assumed the caller knows what he's doing and
# is intentionally over-riding any smarts to be
# applied
# SMTP Server was explicitly specified, therefore it is assumed
# the caller knows what he's doing and is intentionally
# over-riding any smarts to be applied
return
for i in range(len(WEBBASE_LOOKUP_TABLE)):
@ -235,7 +231,7 @@ class NotifyEmail(NotifyBase):
login_type = WEBBASE_LOOKUP_TABLE[i][2]\
.get('login_type', [])
if IS_EMAIL_RE.match(self.user) and \
if NotifyBase.is_email(self.user) and \
WebBaseLogin.EMAIL not in login_type:
# Email specified but login type
# not supported; switch it to user id
@ -248,7 +244,7 @@ class NotifyEmail(NotifyBase):
break
def _notify(self, title, body, **kwargs):
def notify(self, title, body, **kwargs):
"""
Perform Email Notification
"""
@ -263,6 +259,7 @@ class NotifyEmail(NotifyBase):
if self.notify_format == NotifyFormat.HTML:
email = MIMEText(body, 'html')
email['Content-Type'] = 'text/html'
else:
email = MIMEText(body, 'text')
email['Content-Type'] = 'text/plain'
@ -310,8 +307,120 @@ class NotifyEmail(NotifyBase):
try:
socket.quit()
except:
# no problem
pass
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
# Default Format is HTML
results['notify_format'] = NotifyFormat.HTML
to_addr = ''
from_addr = ''
smtp_host = ''
if 'format' in results['qsd'] and len(results['qsd']['format']):
# Extract email format (Text/Html)
try:
format = unquote(results['qsd']['format']).lower()
if len(format) > 0 and format[0] == 't':
results['notify_format'] = NotifyFormat.TEXT
except AttributeError:
pass
# get 'To' email address
try:
to_addr = filter(bool, NotifyBase.split_path(results['host']))[0]
except (AttributeError, IndexError):
# No problem, we have other ways of getting
# the To address
pass
if not NotifyBase.is_email(to_addr):
if results['user']:
# Try to be clever and build a potential
# email address based on what we've been provided
to_addr = '%s@%s' % (
re.split('[\s@]+', results['user'])[0],
re.split('[\s@]+', to_addr)[-1],
)
if not NotifyBase.is_email(to_addr):
NotifyBase.logger.error(
'%s does not contain a recipient email.' %
unquote(results['url'].lstrip('/')),
)
return None
# Attempt to detect 'from' email address
from_addr = to_addr
try:
if 'from' in results['qsd'] and len(results['qsd']['from']):
from_addr = results['qsd']['from']
if not NotifyBase.is_email(results['qsd']['from']):
# Lets be clever and attempt to make the from
# address email
from_addr = '%s@%s' % (
re.split('[\s@]+', from_addr)[0],
re.split('[\s@]+', to_addr)[-1],
)
if not NotifyBase.is_email(from_addr):
NotifyBase.logger.error(
'%s does not contain a from address.' %
unquote(results['url'].lstrip('/')),
)
return None
except AttributeError:
pass
try:
if 'name' in results['qsd'] and len(results['qsd']['name']):
# Extract from name to associate with from address
results['name'] = unquote(results['qsd']['name'])
except AttributeError:
pass
try:
if 'timeout' in results['qsd'] and len(results['qsd']['timeout']):
# Extract the timeout to associate with smtp server
results['timeout'] = unquote(results['qsd']['timeout'])
except AttributeError:
pass
# Store SMTP Host if specified
try:
# Extract from password to associate with smtp server
if 'smtp' in results['qsd'] and len(results['qsd']['smtp']):
smtp_host = unquote(results['qsd']['smtp'])
except AttributeError:
pass
results['to'] = to_addr
results['from'] = from_addr
results['smtp_host'] = smtp_host
return results

View File

@ -1,4 +1,4 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# Faast Notify Wrapper
#
@ -22,12 +22,8 @@
import requests
from .NotifyBase import NotifyBase
from .NotifyBase import NotifyFormat
from .NotifyBase import NotifyImageSize
from .NotifyBase import HTTP_ERROR_MAP
# Faast uses the http protocol with JSON requests
FAAST_URL = 'https://www.appnotifications.com/account/notifications.json'
from ..common import NotifyImageSize
# Image Support (72x72)
FAAST_IMAGE_XY = NotifyImageSize.XY_72
@ -39,24 +35,22 @@ class NotifyFaast(NotifyBase):
"""
# The default protocol (this is secure for faast)
PROTOCOL = 'faast'
protocol = 'faast'
# The default secure protocol
SECURE_PROTOCOL = 'faast'
# Faast uses the http protocol with JSON requests
notify_url = 'https://www.appnotifications.com/account/notifications.json'
def __init__(self, authtoken, **kwargs):
"""
Initialize Faast Object
"""
super(NotifyFaast, self).__init__(
title_maxlen=250, body_maxlen=32768,
image_size=FAAST_IMAGE_XY,
notify_format=NotifyFormat.TEXT,
title_maxlen=250, body_maxlen=32768, image_size=FAAST_IMAGE_XY,
**kwargs)
self.authtoken = authtoken
def _notify(self, title, body, notify_type, **kwargs):
def notify(self, title, body, notify_type, **kwargs):
"""
Perform Faast Notification
"""
@ -81,12 +75,12 @@ class NotifyFaast(NotifyBase):
payload['icon_url'] = image_url
self.logger.debug('Faast POST URL: %s (cert_verify=%r)' % (
FAAST_URL, self.verify_certificate,
self.notify_url, self.verify_certificate,
))
self.logger.debug('Faast Payload: %s' % str(payload))
try:
r = requests.post(
FAAST_URL,
self.notify_url,
data=payload,
headers=headers,
verify=self.verify_certificate,
@ -121,3 +115,23 @@ class NotifyFaast(NotifyBase):
return False
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
# Store our authtoken using the host
results['authtoken'] = results['host']
return results

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# Growl Notify Wrapper
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -18,17 +18,15 @@
#
# You should have received a copy of the GNU General Public License
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
from ..NotifyBase import NotifyBase
from ..NotifyBase import NotifyFormat
from ..NotifyBase import NotifyImageSize
import re
from urllib import unquote
from .gntp.notifier import GrowlNotifier
from .gntp.errors import NetworkError as GrowlNetworkError
from .gntp.errors import AuthError as GrowlAuthenticationError
# Default Growl Port
GROWL_UDP_PORT = 23053
from ..NotifyBase import NotifyBase
from ...common import NotifyImageSize
# Image Support (72x72)
GROWL_IMAGE_XY = NotifyImageSize.XY_72
@ -61,10 +59,10 @@ class NotifyGrowl(NotifyBase):
"""
# The default protocol
PROTOCOL = 'growl'
protocol = 'growl'
# The default secure protocol
SECURE_PROTOCOL = 'growl'
# Default Growl Port
default_port = 23053
def __init__(self, priority=GrowlPriority.NORMAL, version=2, **kwargs):
"""
@ -72,19 +70,18 @@ class NotifyGrowl(NotifyBase):
"""
super(NotifyGrowl, self).__init__(
title_maxlen=250, body_maxlen=32768,
image_size=GROWL_IMAGE_XY,
notify_format=NotifyFormat.TEXT,
**kwargs)
image_size=GROWL_IMAGE_XY, **kwargs)
# A Global flag that tracks registration
self.is_registered = False
if not self.port:
self.port = GROWL_UDP_PORT
self.port = self.default_port
# The Priority of the message
if priority not in GROWL_PRIORITIES:
self.priority = GrowlPriority.NORMAL
else:
self.priority = priority
@ -130,7 +127,7 @@ class NotifyGrowl(NotifyBase):
return
def _notify(self, title, body, notify_type, **kwargs):
def notify(self, title, body, notify_type, **kwargs):
"""
Perform Growl Notification
"""
@ -139,6 +136,12 @@ class NotifyGrowl(NotifyBase):
# We can't do anything
return None
# Limit results to just the first 2 line otherwise there is just to
# much content to display
body = re.split('[\r\n]+', body)
body[0] = body[0].strip('#').strip()
body = '\r\n'.join(body[0:2])
icon = None
if self.include_image:
if self.version >= 2:
@ -175,13 +178,13 @@ class NotifyGrowl(NotifyBase):
)
except GrowlNetworkError as e:
# Since Growl servers listen for UDP broadcasts,
# it's possible that you will never get to this part
# of the code since there is no acknowledgement as to
# whether it accepted what was sent to it or not.
# Since Growl servers listen for UDP broadcasts, it's possible
# that you will never get to this part of the code since there is
# no acknowledgement as to whether it accepted what was sent to it
# or not.
# however, if the host/server is unavailable, you will
# get to this point of the code.
# However, if the host/server is unavailable, you will get to this
# point of the code.
self.logger.warning(
'A Connection error occured sending Growl '
'notification to %s.' % self.host)
@ -191,3 +194,43 @@ class NotifyGrowl(NotifyBase):
return False
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
version = None
if 'version' in results['qsd'] and len(results['qsd']['version']):
# Allow the user to specify the version of the protocol to use.
try:
version = int(
unquote(results['qsd']['version']).strip().split('.')[0])
except (AttributeError, IndexError, TypeError, ValueError):
NotifyBase.logger.warning(
'An invalid Growl version of "%s" was specified and will '
'be ignored.' % results['qsd']['version']
)
pass
# Because of the URL formatting, the password is actually where the
# username field is. For this reason, we just preform this small hack
# to make it (the URL) conform correctly. The following strips out the
# existing password entry (if exists) so that it can be swapped with
# the new one we specify.
results['user'] = None
results['password'] = results.get('user', None)
if version:
results['version'] = version
return results

View File

@ -1,4 +1,4 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
from . import NotifyGrowl
__all__ = [

View File

@ -10,8 +10,8 @@ programs using gntp
import logging
import os
from . import gntp.notifier
from . import gntp.shim
from .gntp import notifier
from .gntp import shim
__all__ = [
'mini',

View File

@ -1,4 +1,4 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# JSON Notify Wrapper
#
@ -19,13 +19,12 @@
# You should have received a copy of the GNU General Public License
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
from json import dumps
import requests
from json import dumps
from .NotifyBase import NotifyBase
from .NotifyBase import NotifyFormat
from .NotifyBase import NotifyImageSize
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyImageSize
# Image Support (128x128)
JSON_IMAGE_XY = NotifyImageSize.XY_128
@ -37,19 +36,17 @@ class NotifyJSON(NotifyBase):
"""
# The default protocol
PROTOCOL = 'json'
protocol = 'json'
# The default secure protocol
SECURE_PROTOCOL = 'jsons'
secure_protocol = 'jsons'
def __init__(self, **kwargs):
"""
Initialize JSON Object
"""
super(NotifyJSON, self).__init__(
title_maxlen=250, body_maxlen=32768,
image_size=JSON_IMAGE_XY,
notify_format=NotifyFormat.TEXT,
title_maxlen=250, body_maxlen=32768, image_size=JSON_IMAGE_XY,
**kwargs)
if self.secure:
@ -64,7 +61,7 @@ class NotifyJSON(NotifyBase):
return
def _notify(self, title, body, notify_type, **kwargs):
def notify(self, title, body, notify_type, **kwargs):
"""
Perform JSON Notification
"""

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# Join Notify Wrapper
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -29,25 +29,17 @@
# You can download the app for your phone here:
# https://play.google.com/store/apps/details?id=com.joaomgcd.join
import requests
import re
import requests
from urllib import urlencode
from .NotifyBase import NotifyBase
from .NotifyBase import NotifyFormat
from .NotifyBase import HTTP_ERROR_MAP
from .NotifyBase import NotifyImageSize
# Join uses the http protocol with JSON requests
JOIN_URL = 'https://joinjoaomgcd.appspot.com/_ah/api/messaging/v1/sendPush'
from ..common import NotifyImageSize
# Token required as part of the API request
VALIDATE_APIKEY = re.compile(r'[A-Za-z0-9]{32}')
# Default User
JOIN_DEFAULT_USER = 'apprise'
# Extend HTTP Error Messages
JOIN_HTTP_ERROR_MAP = dict(HTTP_ERROR_MAP.items() + {
401: 'Unauthorized - Invalid Token.',
@ -75,25 +67,25 @@ class NotifyJoin(NotifyBase):
"""
# The default protocol
PROTOCOL = 'join'
protocol = 'join'
# The default secure protocol
SECURE_PROTOCOL = 'join'
# Join uses the http protocol with JSON requests
notify_url = \
'https://joinjoaomgcd.appspot.com/_ah/api/messaging/v1/sendPush'
def __init__(self, apikey, devices, **kwargs):
"""
Initialize Join Object
"""
super(NotifyJoin, self).__init__(
title_maxlen=250, body_maxlen=1000,
image_size=JOIN_IMAGE_XY,
notify_format=NotifyFormat.TEXT,
title_maxlen=250, body_maxlen=1000, image_size=JOIN_IMAGE_XY,
**kwargs)
if not VALIDATE_APIKEY.match(apikey.strip()):
self.logger.warning(
'The first API Token specified (%s) is invalid.' % apikey,
)
raise TypeError(
'The first API Token specified (%s) is invalid.' % apikey,
)
@ -105,8 +97,10 @@ class NotifyJoin(NotifyBase):
self.devices = filter(bool, DEVICE_LIST_DELIM.split(
devices,
))
elif isinstance(devices, (tuple, list)):
self.devices = devices
else:
self.devices = list()
@ -114,11 +108,17 @@ class NotifyJoin(NotifyBase):
self.logger.warning('No device(s) were specified.')
raise TypeError('No device(s) were specified.')
def _notify(self, title, body, notify_type, **kwargs):
def notify(self, title, body, notify_type, **kwargs):
"""
Perform Join Notification
"""
# Limit results to just the first 2 line otherwise
# there is just to much content to display
body = re.split('[\r\n]+', body)
body[0] = body[0].strip('#').strip()
body = '\r\n'.join(body[0:2])
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/x-www-form-urlencoded',
@ -158,11 +158,10 @@ class NotifyJoin(NotifyBase):
url_args['icon'] = image_url
# prepare payload
payload = {
}
payload = {}
# Prepare the URL
url = '%s?%s' % (JOIN_URL, urlencode(url_args))
url = '%s?%s' % (self.notify_url, urlencode(url_args))
self.logger.debug('Join POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
@ -194,6 +193,7 @@ class NotifyJoin(NotifyBase):
r.status_code))
# self.logger.debug('Response Details: %s' % r.raw.read())
# Return; we're done
has_error = True
@ -210,3 +210,31 @@ class NotifyJoin(NotifyBase):
self.throttle()
return has_error
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
try:
devices = ' '.join(
filter(bool, NotifyBase.split_path(results['fullpath'])))
except (AttributeError, IndexError):
# Force some bad values that will get caught
# in parsing later
devices = None
results['apikey'] = results['host']
results['devices'] = devices
return results

View File

@ -1,4 +1,4 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# MatterMost Notify Wrapper
#
@ -19,15 +19,14 @@
# You should have received a copy of the GNU General Public License
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
from json import dumps
import re
import requests
from json import dumps
from urllib import unquote as unquote
from .NotifyBase import NotifyBase
from .NotifyBase import NotifyFormat
from .NotifyBase import NotifyImageSize
from .NotifyBase import HTTP_ERROR_MAP
from .NotifyBase import NOTIFY_APPLICATION_ID
import re
from ..common import NotifyImageSize
# Some Reference Locations:
# - https://docs.mattermost.com/developer/webhooks-incoming.html
@ -39,9 +38,6 @@ VALIDATE_AUTHTOKEN = re.compile(r'[A-Za-z0-9]{24,32}')
# Image Support (72x72)
MATTERMOST_IMAGE_XY = NotifyImageSize.XY_72
# MATTERMOST uses the http protocol with JSON requests
MATTERMOST_PORT = 8065
class NotifyMatterMost(NotifyBase):
"""
@ -49,23 +45,25 @@ class NotifyMatterMost(NotifyBase):
"""
# The default protocol
PROTOCOL = 'mmost'
protocol = 'mmost'
# The default secure protocol
SECURE_PROTOCOL = 'mmosts'
secure_protocol = 'mmosts'
# The default Mattermost port
default_port = 8065
def __init__(self, authtoken, channel=None, **kwargs):
"""
Initialize MatterMost Object
"""
super(NotifyMatterMost, self).__init__(
title_maxlen=250, body_maxlen=4000,
image_size=MATTERMOST_IMAGE_XY,
notify_format=NotifyFormat.TEXT,
title_maxlen=250, body_maxlen=4000, image_size=MATTERMOST_IMAGE_XY,
**kwargs)
if self.secure:
self.schema = 'https'
else:
self.schema = 'http'
@ -93,11 +91,11 @@ class NotifyMatterMost(NotifyBase):
self.channel = channel
if not self.port:
self.port = MATTERMOST_PORT
self.port = self.default_port
return
def _notify(self, title, body, notify_type, **kwargs):
def notify(self, title, body, notify_type, **kwargs):
"""
Perform MatterMost Notification
"""
@ -117,7 +115,7 @@ class NotifyMatterMost(NotifyBase):
payload['username'] = self.user
else:
payload['username'] = NOTIFY_APPLICATION_ID
payload['username'] = self.app_id
if self.channel:
payload['channel'] = self.channel
@ -170,3 +168,44 @@ class NotifyMatterMost(NotifyBase):
return False
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
try:
authtoken = filter(
bool, NotifyBase.split_path(results['fullpath']))[0]
except (AttributeError, IndexError):
# Force some bad values that will get caught
# in parsing later
authtoken = None
channel = None
if 'channel' in results['qsd'] and len(results['qsd']['channel']):
# Allow the user to specify the channel to post to
try:
channel = unquote(results['qsd']['channel']).strip()
except (AttributeError, TypeError, ValueError):
NotifyBase.logger.warning(
'An invalid MatterMost channel of "%s" was specified and '
'will be ignored.' % results['qsd']['channel']
)
pass
results['authtoken'] = authtoken
results['channel'] = channel
return results

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# Notify My Android (NMA) Notify Wrapper
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -19,16 +19,14 @@
# You should have received a copy of the GNU General Public License
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
import requests
import re
import requests
from urllib import unquote
from .NotifyBase import NotifyBase
from .NotifyBase import NotifyFormat
from .NotifyBase import HTTP_ERROR_MAP
# Notify My Android uses the http protocol with JSON requests
NMA_URL = 'https://www.notifymyandroid.com/publicapi/notify'
# Extend HTTP Error Messages
NMA_HTTP_ERROR_MAP = dict(HTTP_ERROR_MAP.items() + {
400: 'Data is wrong format, invalid length or null.',
@ -64,10 +62,10 @@ class NotifyMyAndroid(NotifyBase):
"""
# The default protocol
PROTOCOL = 'nma'
protocol = 'nma'
# The default secure protocol
SECURE_PROTOCOL = 'nma'
# Notify My Android uses the http protocol with JSON requests
notify_url = 'https://www.notifymyandroid.com/publicapi/notify'
def __init__(self, apikey, priority=NotifyMyAndroidPriority.NORMAL,
devapikey=None, **kwargs):
@ -75,13 +73,12 @@ class NotifyMyAndroid(NotifyBase):
Initialize Notify My Android Object
"""
super(NotifyMyAndroid, self).__init__(
title_maxlen=1000, body_maxlen=10000,
notify_format=NotifyFormat.HTML,
**kwargs)
title_maxlen=1000, body_maxlen=10000, **kwargs)
# The Priority of the message
if priority not in NMA_PRIORITIES:
self.priority = NotifyMyAndroidPriority.NORMAL
else:
self.priority = priority
@ -106,7 +103,7 @@ class NotifyMyAndroid(NotifyBase):
)
self.devapikey = devapikey
def _notify(self, title, body, notify_type, **kwargs):
def notify(self, title, body, notify_type, **kwargs):
"""
Perform Notify My Android Notification
"""
@ -131,12 +128,12 @@ class NotifyMyAndroid(NotifyBase):
payload['developerkey'] = self.devapikey
self.logger.debug('NMA POST URL: %s (cert_verify=%r)' % (
NMA_URL, self.verify_certificate,
self.notify_url, self.verify_certificate,
))
self.logger.debug('NMA Payload: %s' % str(payload))
try:
r = requests.post(
NMA_URL,
self.notify_url,
data=payload,
headers=headers,
verify=self.verify_certificate,
@ -171,3 +168,33 @@ class NotifyMyAndroid(NotifyBase):
return False
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
results['notify_format'] = NotifyFormat.HTML
if 'format' in results['qsd'] and len(results['qsd']['format']):
# Extract email format (Text/Html)
try:
format = unquote(results['qsd']['format']).lower()
if len(format) > 0 and format[0] == 't':
results['notify_format'] = NotifyFormat.TEXT
except AttributeError:
pass
results['apikey'] = results['host']
return results

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# Prowl Notify Wrapper
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -19,16 +19,12 @@
# You should have received a copy of the GNU General Public License
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
import requests
import re
import requests
from .NotifyBase import NotifyBase
from .NotifyBase import NotifyFormat
from .NotifyBase import HTTP_ERROR_MAP
# Prowl uses the http protocol with JSON requests
PROWL_URL = 'https://api.prowlapp.com/publicapi/add'
# Used to validate API Key
VALIDATE_APIKEY = re.compile(r'[A-Za-z0-9]{40}')
@ -65,25 +61,23 @@ class NotifyProwl(NotifyBase):
A wrapper for Prowl Notifications
"""
# The default protocol
PROTOCOL = 'prowl'
# The default secure protocol
SECURE_PROTOCOL = 'prowl'
secure_protocol = 'prowl'
def __init__(self, apikey, providerkey=None,
priority=ProwlPriority.NORMAL,
# Prowl uses the http protocol with JSON requests
notify_url = 'https://api.prowlapp.com/publicapi/add'
def __init__(self, apikey, providerkey=None, priority=ProwlPriority.NORMAL,
**kwargs):
"""
Initialize Prowl Object
"""
super(NotifyProwl, self).__init__(
title_maxlen=1024, body_maxlen=10000,
notify_format=NotifyFormat.TEXT,
**kwargs)
title_maxlen=1024, body_maxlen=10000, **kwargs)
if priority not in PROWL_PRIORITIES:
self.priority = ProwlPriority.NORMAL
else:
self.priority = priority
@ -112,7 +106,7 @@ class NotifyProwl(NotifyBase):
# Store the Provider Key
self.providerkey = providerkey
def _notify(self, title, body, **kwargs):
def notify(self, title, body, **kwargs):
"""
Perform Prowl Notification
"""
@ -135,12 +129,12 @@ class NotifyProwl(NotifyBase):
payload['providerkey'] = self.providerkey
self.logger.debug('Prowl POST URL: %s (cert_verify=%r)' % (
PROWL_URL, self.verify_certificate,
self.notify_url, self.verify_certificate,
))
self.logger.debug('Prowl Payload: %s' % str(payload))
try:
r = requests.post(
PROWL_URL,
self.notify_url,
data=payload,
headers=headers,
verify=self.verify_certificate,
@ -161,6 +155,7 @@ class NotifyProwl(NotifyBase):
r.status_code))
self.logger.debug('Response Details: %s' % r.raw.read())
# Return; we're done
return False
else:
@ -175,3 +170,34 @@ class NotifyProwl(NotifyBase):
return False
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
# optionally find the provider key
try:
providerkey = filter(
bool, NotifyBase.split_path(results['fullpath']))[0]
if not providerkey:
providerkey = None
except (AttributeError, IndexError):
providerkey = None
results['apikey'] = results['host']
results['providerkey'] = providerkey
return results

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# PushBullet Notify Wrapper
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -19,21 +19,18 @@
# You should have received a copy of the GNU General Public License
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
from json import dumps
import requests
import re
import requests
from json import dumps
from urllib import unquote
from .NotifyBase import NotifyBase
from .NotifyBase import NotifyFormat
from .NotifyBase import HTTP_ERROR_MAP
from .NotifyBase import IS_EMAIL_RE
# Flag used as a placeholder to sending to all devices
PUSHBULLET_SEND_TO_ALL = 'ALL_DEVICES'
# PushBullet uses the http protocol with JSON requests
PUSHBULLET_URL = 'https://api.pushbullet.com/v2/pushes'
# Used to break apart list of potential recipients by their delimiter
# into a usable list.
RECIPIENTS_LIST_DELIM = re.compile(r'[ \t\r\n,\\/]+')
@ -50,34 +47,37 @@ class NotifyPushBullet(NotifyBase):
"""
# The default protocol
PROTOCOL = 'pbul'
protocol = 'pbul'
# The default secure protocol
SECURE_PROTOCOL = 'pbul'
secure_protocol = 'pbul'
# PushBullet uses the http protocol with JSON requests
notify_url = 'https://api.pushbullet.com/v2/pushes'
def __init__(self, accesstoken, recipients=None, **kwargs):
"""
Initialize PushBullet Object
"""
super(NotifyPushBullet, self).__init__(
title_maxlen=250, body_maxlen=32768,
notify_format=NotifyFormat.TEXT,
**kwargs)
title_maxlen=250, body_maxlen=32768, **kwargs)
self.accesstoken = accesstoken
if isinstance(recipients, basestring):
self.recipients = filter(bool, RECIPIENTS_LIST_DELIM.split(
recipients,
))
elif isinstance(recipients, (tuple, list)):
self.recipients = recipients
else:
self.recipients = list()
if len(self.recipients) == 0:
self.recipients = (PUSHBULLET_SEND_TO_ALL, )
def _notify(self, title, body, **kwargs):
def notify(self, title, body, **kwargs):
"""
Perform PushBullet Notification
"""
@ -122,12 +122,12 @@ class NotifyPushBullet(NotifyBase):
"Recipient '%s' is a device" % recipient)
self.logger.debug('PushBullet POST URL: %s (cert_verify=%r)' % (
PUSHBULLET_URL, self.verify_certificate,
self.notify_url, self.verify_certificate,
))
self.logger.debug('PushBullet Payload: %s' % str(payload))
try:
r = requests.post(
PUSHBULLET_URL,
self.notify_url,
data=dumps(payload),
headers=headers,
auth=auth,
@ -165,3 +165,28 @@ class NotifyPushBullet(NotifyBase):
self.throttle()
return not has_error
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
try:
recipients = unquote(results['fullpath'])
except AttributeError:
recipients = ''
results['accesstoken'] = results['host']
results['recipients'] = recipients
return results

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# Pushalot Notify Wrapper
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -19,17 +19,13 @@
# You should have received a copy of the GNU General Public License
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
from json import dumps
import requests
import re
import requests
from json import dumps
from .NotifyBase import NotifyBase
from .NotifyBase import NotifyFormat
from .NotifyBase import NotifyImageSize
from .NotifyBase import HTTP_ERROR_MAP
# Pushalot uses the http protocol with JSON requests
PUSHALOT_URL = 'https://pushalot.com/api/sendmessage'
from ..common import NotifyImageSize
# Image Support (72x72)
PUSHALOT_IMAGE_XY = NotifyImageSize.XY_72
@ -50,10 +46,13 @@ class NotifyPushalot(NotifyBase):
"""
# The default protocol
PROTOCOL = 'palot'
protocol = 'palot'
# The default secure protocol
SECURE_PROTOCOL = 'palot'
secure_protocol = 'palot'
# Pushalot uses the http protocol with JSON requests
notify_url = 'https://pushalot.com/api/sendmessage'
def __init__(self, authtoken, is_important=False, **kwargs):
"""
@ -61,9 +60,7 @@ class NotifyPushalot(NotifyBase):
"""
super(NotifyPushalot, self).__init__(
title_maxlen=250, body_maxlen=32768,
image_size=PUSHALOT_IMAGE_XY,
notify_format=NotifyFormat.TEXT,
**kwargs)
image_size=PUSHALOT_IMAGE_XY, **kwargs)
# Is Important Flag
self.is_important = is_important
@ -78,7 +75,7 @@ class NotifyPushalot(NotifyBase):
'Invalid Pushalot Authorization Token Specified.'
)
def _notify(self, title, body, notify_type, **kwargs):
def notify(self, title, body, notify_type, **kwargs):
"""
Perform Pushalot Notification
"""
@ -105,16 +102,17 @@ class NotifyPushalot(NotifyBase):
payload['Image'] = image_url
self.logger.debug('Pushalot POST URL: %s (cert_verify=%r)' % (
PUSHALOT_URL, self.verify_certificate,
self.notify_url, self.verify_certificate,
))
self.logger.debug('Pushalot Payload: %s' % str(payload))
try:
r = requests.post(
PUSHALOT_URL,
self.notify_url,
data=dumps(payload),
headers=headers,
verify=self.verify_certificate,
)
if r.status_code != requests.codes.ok:
# We had a problem
try:
@ -144,3 +142,21 @@ class NotifyPushalot(NotifyBase):
return False
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
results['authtoken'] = results['host']
return results

View File

@ -1,76 +0,0 @@
# -*- encoding: utf-8 -*-
#
# Pushjet Notify Wrapper
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
# apprise is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# apprise is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
from pushjet import errors
from pushjet import pushjet
from .NotifyBase import NotifyBase
from .NotifyBase import NotifyFormat
class NotifyPushjet(NotifyBase):
"""
A wrapper for Pushjet Notifications
"""
# The default protocol
PROTOCOL = 'pjet'
# The default secure protocol
SECURE_PROTOCOL = 'pjets'
def __init__(self, **kwargs):
"""
Initialize Pushjet Object
"""
super(NotifyPushjet, self).__init__(
title_maxlen=250, body_maxlen=32768,
notify_format=NotifyFormat.TEXT,
**kwargs)
def _notify(self, title, body, notify_type):
"""
Perform Pushjet Notification
"""
try:
if self.user and self.host:
server = "http://"
if self.secure:
server = "https://"
server += self.host
if self.port:
server += ":" + str(self.port)
api = pushjet.Api(server)
service = api.Service(secret_key=self.user)
else:
api = pushjet.Api(pushjet.DEFAULT_API_URL)
service = api.Service(secret_key=self.host)
service.send(body, title)
except (errors.PushjetError, ValueError) as e:
self.logger.warning('Failed to send Pushjet notification.')
self.logger.debug('Pushjet Exception: %s' % str(e))
return False
return True

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# Pushjet Notify Wrapper
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -23,7 +23,6 @@ from .pushjet import errors
from .pushjet import pushjet
from ..NotifyBase import NotifyBase
from ..NotifyBase import NotifyFormat
class NotifyPushjet(NotifyBase):
@ -32,21 +31,19 @@ class NotifyPushjet(NotifyBase):
"""
# The default protocol
PROTOCOL = 'pjet'
protocol = 'pjet'
# The default secure protocol
SECURE_PROTOCOL = 'pjets'
secure_protocol = 'pjets'
def __init__(self, **kwargs):
"""
Initialize Pushjet Object
"""
super(NotifyPushjet, self).__init__(
title_maxlen=250, body_maxlen=32768,
notify_format=NotifyFormat.TEXT,
**kwargs)
title_maxlen=250, body_maxlen=32768, **kwargs)
def _notify(self, title, body, notify_type):
def notify(self, title, body, notify_type):
"""
Perform Pushjet Notification
"""
@ -62,6 +59,7 @@ class NotifyPushjet(NotifyBase):
api = pushjet.Api(server)
service = api.Service(secret_key=self.user)
else:
api = pushjet.Api(pushjet.DEFAULT_API_URL)
service = api.Service(secret_key=self.host)

View File

@ -1,4 +1,4 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
from . import NotifyPushjet
__all__ = [

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# Pushover Notify Wrapper
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -19,19 +19,16 @@
# You should have received a copy of the GNU General Public License
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
import requests
import re
import requests
from urllib import unquote
from .NotifyBase import NotifyBase
from .NotifyBase import NotifyFormat
from .NotifyBase import HTTP_ERROR_MAP
# Flag used as a placeholder to sending to all devices
PUSHOVER_SEND_TO_ALL = 'ALL_DEVICES'
# Pushover uses the http protocol with JSON requests
PUSHOVER_URL = 'https://api.pushover.net/1/messages.json'
# Used to validate API Key
VALIDATE_TOKEN = re.compile(r'[A-Za-z0-9]{30}')
@ -74,21 +71,21 @@ class NotifyPushover(NotifyBase):
"""
# The default protocol
PROTOCOL = 'pover'
protocol = 'pover'
# The default secure protocol
SECURE_PROTOCOL = 'pover'
secure_protocol = 'pover'
# Pushover uses the http protocol with JSON requests
notify_url = 'https://api.pushover.net/1/messages.json'
def __init__(self, token, devices=None,
priority=PushoverPriority.NORMAL,
**kwargs):
priority=PushoverPriority.NORMAL, **kwargs):
"""
Initialize Pushover Object
"""
super(NotifyPushover, self).__init__(
title_maxlen=250, body_maxlen=512,
notify_format=NotifyFormat.TEXT,
**kwargs)
title_maxlen=250, body_maxlen=512, **kwargs)
if not VALIDATE_TOKEN.match(token.strip()):
self.logger.warning(
@ -135,7 +132,7 @@ class NotifyPushover(NotifyBase):
'The user/group specified (%s) is invalid.' % self.user,
)
def _notify(self, title, body, **kwargs):
def notify(self, title, body, **kwargs):
"""
Perform Pushover Notification
"""
@ -174,12 +171,12 @@ class NotifyPushover(NotifyBase):
payload['device'] = device
self.logger.debug('Pushover POST URL: %s (cert_verify=%r)' % (
PUSHOVER_URL, self.verify_certificate,
self.notify_url, self.verify_certificate,
))
self.logger.debug('Pushover Payload: %s' % str(payload))
try:
r = requests.post(
PUSHOVER_URL,
self.notify_url,
data=payload,
headers=headers,
auth=auth,
@ -220,3 +217,28 @@ class NotifyPushover(NotifyBase):
self.throttle()
return has_error
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
try:
devices = unquote(results['fullpath'])
except AttributeError:
devices = ''
results['token'] = results['host']
results['devices'] = devices
return results

View File

@ -1,4 +1,4 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# Notify Rocket.Chat Notify Wrapper
#
@ -19,12 +19,12 @@
# You should have received a copy of the GNU General Public License
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
import requests
import json
import re
import requests
from json import loads
from urllib import unquote
from .NotifyBase import NotifyBase
from .NotifyBase import NotifyFormat
from .NotifyBase import HTTP_ERROR_MAP
IS_CHANNEL = re.compile(r'^#(?P<name>[A-Za-z0-9]+)$')
@ -47,19 +47,17 @@ class NotifyRocketChat(NotifyBase):
"""
# The default protocol
PROTOCOL = 'rocket'
protocol = 'rocket'
# The default secure protocol
SECURE_PROTOCOL = 'rockets'
secure_protocol = 'rockets'
def __init__(self, recipients=None, **kwargs):
"""
Initialize Notify Rocket.Chat Object
"""
super(NotifyRocketChat, self).__init__(
title_maxlen=200, body_maxlen=32768,
notify_format=NotifyFormat.TEXT,
**kwargs)
title_maxlen=200, body_maxlen=32768, **kwargs)
if self.secure:
self.schema = 'https'
@ -127,7 +125,7 @@ class NotifyRocketChat(NotifyBase):
'Authentication to Rocket.Chat server failed.'
)
def _notify(self, title, body, notify_type, **kwargs):
def notify(self, title, body, notify_type, **kwargs):
"""
wrapper to send_notification since we can alert more then one channel
"""
@ -235,7 +233,7 @@ class NotifyRocketChat(NotifyBase):
else:
self.logger.debug('Rocket.Chat authentication successful')
response = json.loads(r.text)
response = loads(r.text)
if response.get('status') != "success":
self.logger.warning(
'Could not authenticate with Rocket.Chat server.')
@ -305,3 +303,25 @@ class NotifyRocketChat(NotifyBase):
# We're no longer authenticated now
self.authenticated = False
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
try:
results['recipients'] = unquote(results['fullpath'])
except AttributeError:
return None
return results

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# Slack Notify Wrapper
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -31,20 +31,14 @@
# These are important <--------------^---------^---------------^
#
#
import requests
import re
import requests
from json import dumps
from time import time
from .NotifyBase import NotifyBase
from .NotifyBase import NotifyFormat
from .NotifyBase import HTTP_ERROR_MAP
from .NotifyBase import HTML_NOTIFY_MAP
from .NotifyBase import NotifyImageSize
# Slack uses the http protocol with JSON requests
SLACK_URL = 'https://hooks.slack.com/services'
from ..common import NotifyImageSize
# Token required as part of the API request
# /AAAAAAAAA/........./........................
@ -81,11 +75,11 @@ class NotifySlack(NotifyBase):
A wrapper for Slack Notifications
"""
# The default protocol
PROTOCOL = 'slack'
# The default secure protocol
SECURE_PROTOCOL = 'slack'
secure_protocol = 'slack'
# Slack uses the http protocol with JSON requests
notify_url = 'https://hooks.slack.com/services'
def __init__(self, token_a, token_b, token_c, channels, **kwargs):
"""
@ -93,9 +87,7 @@ class NotifySlack(NotifyBase):
"""
super(NotifySlack, self).__init__(
title_maxlen=250, body_maxlen=1000,
image_size=SLACK_IMAGE_XY,
notify_format=NotifyFormat.TEXT,
**kwargs)
image_size=SLACK_IMAGE_XY, **kwargs)
if not VALIDATE_TOKEN_A.match(token_a.strip()):
self.logger.warning(
@ -165,7 +157,7 @@ class NotifySlack(NotifyBase):
re.IGNORECASE,
)
def _notify(self, title, body, notify_type, **kwargs):
def notify(self, title, body, notify_type, **kwargs):
"""
Perform Slack Notification
"""
@ -186,7 +178,7 @@ class NotifySlack(NotifyBase):
lambda x: self._re_formatting_map[x.group()], body,
)
url = '%s/%s/%s/%s' % (
SLACK_URL,
self.notify_url,
self.token_a,
self.token_b,
self.token_c,
@ -229,7 +221,7 @@ class NotifySlack(NotifyBase):
'attachments': [{
'title': title,
'text': body,
'color': HTML_NOTIFY_MAP[notify_type],
'color': self.asset.html_color[notify_type],
# Time
'ts': time(),
'footer': self.app_id,
@ -285,3 +277,48 @@ class NotifySlack(NotifyBase):
self.throttle()
return has_error
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
# The first token is stored in the hostnamee
token_a = results['host']
# Now fetch the remaining tokens
try:
token_b, token_c = filter(
bool, NotifyBase.split_path(results['fullpath']))[0:2]
except (AttributeError, IndexError):
# Force some bad values that will get caught
# in parsing later
token_b = None
token_c = None
try:
channels = '#'.join(filter(
bool, NotifyBase.split_path(results['fullpath']))[2:])
except (AttributeError, IndexError):
# Force some bad values that will get caught
# in parsing later
channels = None
results['token_a'] = token_a
results['token_b'] = token_b
results['token_c'] = token_c
results['channels'] = channels
return results

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# Telegram Notify Wrapper
#
# Copyright (C) 2016-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -53,9 +53,6 @@ from .NotifyBase import NotifyBase
from .NotifyBase import NotifyFormat
from .NotifyBase import HTTP_ERROR_MAP
# Telegram uses the http protocol with JSON requests
TELEGRAM_BOT_URL = 'https://api.telegram.org/bot'
# Token required as part of the API request
# allow the word 'bot' infront
VALIDATE_BOT_TOKEN = re.compile(
@ -86,11 +83,11 @@ class NotifyTelegram(NotifyBase):
A wrapper for Telegram Notifications
"""
# The default protocol
PROTOCOL = 'tgram'
# The default secure protocol
SECURE_PROTOCOL = 'tgram'
secure_protocol = 'tgram'
# Telegram uses the http protocol with JSON requests
notify_url = 'https://api.telegram.org/bot'
def __init__(self, bot_token, chat_ids, **kwargs):
"""
@ -98,9 +95,7 @@ class NotifyTelegram(NotifyBase):
"""
super(NotifyTelegram, self).__init__(
title_maxlen=250, body_maxlen=4096,
image_size=TELEGRAM_IMAGE_XY,
notify_format=NotifyFormat.TEXT,
**kwargs)
image_size=TELEGRAM_IMAGE_XY, **kwargs)
if bot_token is None:
raise TypeError(
@ -157,7 +152,7 @@ class NotifyTelegram(NotifyBase):
}
url = '%s%s/%s' % (
TELEGRAM_BOT_URL,
self.notify_url,
self.bot_token,
'getMe'
)
@ -209,7 +204,7 @@ class NotifyTelegram(NotifyBase):
return chat_id
def _notify(self, title, body, notify_type, **kwargs):
def notify(self, title, body, notify_type, **kwargs):
"""
Perform Telegram Notification
"""
@ -224,13 +219,11 @@ class NotifyTelegram(NotifyBase):
image_url = None
if self.include_image:
image_content = self.image_raw(
notify_type,
)
image_content = self.image_raw(notify_type)
if image_content is not None:
# prepare our eimage URL
# prepare our image URL
image_url = '%s%s/%s' % (
TELEGRAM_BOT_URL,
self.notify_url,
self.bot_token,
'sendPhoto'
)
@ -239,7 +232,7 @@ class NotifyTelegram(NotifyBase):
files = {'photo': ('%s.png' % notify_type, image_content)}
url = '%s%s/%s' % (
TELEGRAM_BOT_URL,
self.notify_url,
self.bot_token,
'sendMessage'
)
@ -410,3 +403,87 @@ class NotifyTelegram(NotifyBase):
self.throttle()
return has_error
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
# super() is formatted slightly different when dealing with
# static method inheritance
results = NotifyBase.parse_url(url)
if results:
# We're done early
return results
# This is a dirty hack; but it's the only work around to
# tgram:// messages since the bot_token has a colon in it.
# It invalidates an normal URL.
# This hack searches for this bogus URL and corrects it
# so we can properly load it further down. The other
# alternative is to ask users to actually change the colon
# into a slash (which will work too), but it's more likely
# to cause confusion... So this is the next best thing
tgram = re.match(
r'(?P<protocol>%s://)(bot)?(?P<prefix>([a-z0-9_-]+)'
r'(:[a-z0-9_-]+)?@)?(?P<btoken_a>[0-9]+):+'
r'(?P<remaining>.*)$' % 'tgram',
url, re.I)
if not tgram:
# Content is simply not parseable
return None
if tgram.group('prefix'):
# Try again
result = NotifyBase.parse_url(
'%s%s%s/%s' % (
tgram.group('protocol'),
tgram.group('prefix'),
tgram.group('btoken_a'),
tgram.group('remaining'),
),
)
else:
# Try again
result = NotifyBase.parse_url(
'%s%s/%s' % (
tgram.group('protocol'),
tgram.group('btoken_a'),
tgram.group('remaining'),
),
)
# The first token is stored in the hostnamee
bot_token_a = result['host']
# Now fetch the remaining tokens
try:
bot_token_b = filter(
bool, NotifyBase.split_path(result['fullpath']))[0]
bot_token = '%s:%s' % (bot_token_a, bot_token_b)
except (AttributeError, IndexError):
# Force a bad value that will get caught in parsing later
bot_token = None
try:
chat_ids = ','.join(
filter(bool, NotifyBase.split_path(result['fullpath']))[1:])
except (AttributeError, IndexError):
# Force some bad values that will get caught
# in parsing later
chat_ids = None
# Return our results
return result + {
'bot_token': bot_token,
'chat_ids': chat_ids,
}.items()

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# (Super) Toasty Notify Wrapper
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -19,17 +19,14 @@
# You should have received a copy of the GNU General Public License
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
from urllib import quote
import requests
import re
import requests
from urllib import quote
from urllib import unquote
from .NotifyBase import NotifyBase
from .NotifyBase import NotifyFormat
from .NotifyBase import NotifyImageSize
from .NotifyBase import HTTP_ERROR_MAP
# Toasty uses the http protocol with JSON requests
TOASTY_URL = 'http://api.supertoasty.com/notify/'
from ..common import NotifyImageSize
# Image Support (128x128)
TOASTY_IMAGE_XY = NotifyImageSize.XY_128
@ -45,34 +42,34 @@ class NotifyToasty(NotifyBase):
"""
# The default protocol
PROTOCOL = 'toasty'
protocol = 'toasty'
# The default secure protocol
SECURE_PROTOCOL = 'toasty'
# Toasty uses the http protocol with JSON requests
notify_url = 'http://api.supertoasty.com/notify/'
def __init__(self, devices, **kwargs):
"""
Initialize Toasty Object
"""
super(NotifyToasty, self).__init__(
title_maxlen=250, body_maxlen=32768,
image_size=TOASTY_IMAGE_XY,
notify_format=NotifyFormat.TEXT,
title_maxlen=250, body_maxlen=32768, image_size=TOASTY_IMAGE_XY,
**kwargs)
if isinstance(devices, basestring):
self.devices = filter(bool, DEVICES_LIST_DELIM.split(
devices,
))
elif isinstance(devices, (tuple, list)):
self.devices = devices
else:
raise TypeError('You must specify at least 1 device.')
if not self.user:
raise TypeError('You must specify a username.')
def _notify(self, title, body, notify_type, **kwargs):
def notify(self, title, body, notify_type, **kwargs):
"""
Perform Toasty Notification
"""
@ -105,7 +102,7 @@ class NotifyToasty(NotifyBase):
payload['image'] = image_url
# URL to transmit content via
url = '%s%s' % (TOASTY_URL, device)
url = '%s%s' % (self.notify_url, device)
self.logger.debug('Toasty POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
@ -153,3 +150,28 @@ class NotifyToasty(NotifyBase):
self.throttle()
return has_error
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
try:
devices = unquote(results['fullpath'])
except AttributeError:
devices = ''
# Store our devices
results['devices'] = '%s/%s' % (results['host'], devices)
return results

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# Twitter Notify Wrapper
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -21,7 +21,6 @@
from . import tweepy
from ..NotifyBase import NotifyBase
from ..NotifyBase import NotifyFormat
# Direct Messages have not image support
TWITTER_IMAGE_XY = None
@ -33,11 +32,8 @@ class NotifyTwitter(NotifyBase):
"""
# The default protocol
PROTOCOL = 'tweet'
# The default secure protocol
SECURE_PROTOCOL = 'tweet'
secure_protocol = 'tweet'
def __init__(self, ckey, csecret, akey, asecret, **kwargs):
"""
@ -48,9 +44,7 @@ class NotifyTwitter(NotifyBase):
"""
super(NotifyTwitter, self).__init__(
title_maxlen=250, body_maxlen=4096,
image_size=TWITTER_IMAGE_XY,
notify_format=NotifyFormat.TEXT,
**kwargs)
image_size=TWITTER_IMAGE_XY, **kwargs)
if not ckey:
raise TypeError(
@ -80,6 +74,7 @@ class NotifyTwitter(NotifyBase):
try:
# Attempt to Establish a connection to Twitter
self.auth = tweepy.OAuthHandler(ckey, csecret)
# Apply our Access Tokens
self.auth.set_access_token(akey, asecret)
@ -91,7 +86,7 @@ class NotifyTwitter(NotifyBase):
return
def _notify(self, title, body, notify_type, **kwargs):
def notify(self, title, body, notify_type, **kwargs):
"""
Perform Twitter Notification
"""
@ -114,3 +109,40 @@ class NotifyTwitter(NotifyBase):
return False
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
# The first token is stored in the hostnamee
consumer_key = results['host']
# Now fetch the remaining tokens
try:
consumer_secret, access_token_key, access_token_secret = \
filter(bool, NotifyBase.split_path(results['fullpath']))[0:3]
except (AttributeError, IndexError):
# Force some bad values that will get caught
# in parsing later
consumer_secret = None
access_token_key = None
access_token_secret = None
results['ckey'] = consumer_key,
results['csecret'] = consumer_secret,
results['akey'] = access_token_key,
results['asecret'] = access_token_secret,
return results

View File

@ -1,4 +1,4 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
from . import NotifyTwitter
__all__ = [

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# XBMC Notify Wrapper
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -19,22 +19,22 @@
# You should have received a copy of the GNU General Public License
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
from json import dumps
import re
import requests
from json import dumps
from .NotifyBase import NotifyBase
from .NotifyBase import NotifyFormat
from .NotifyBase import NotifyType
from .NotifyBase import NotifyImageSize
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyType
from ..common import NotifyImageSize
# Image Support (128x128)
XBMC_IMAGE_XY = NotifyImageSize.XY_128
# XBMC uses the http protocol with JSON requests
XBMC_PORT = 8080
# XBMC uses v2
XBMC_PROTOCOL_V2 = 2
# Kodi uses v6
XBMC_PROTOCOL_V6 = 6
SUPPORTED_XBMC_PROTOCOLS = (
@ -48,11 +48,14 @@ class NotifyXBMC(NotifyBase):
A wrapper for XBMC/KODI Notifications
"""
# The default protocol
PROTOCOL = ('xbmc', 'kodi')
# The default protocols
protocol = ('xbmc', 'kodi')
# The default secure protocol
SECURE_PROTOCOL = ('xbmc', 'kodis')
# The default secure protocols
secure_protocol = ('xbmc', 'kodis')
# XBMC uses the http protocol with JSON requests
default_port = 8080
def __init__(self, **kwargs):
"""
@ -60,17 +63,16 @@ class NotifyXBMC(NotifyBase):
"""
super(NotifyXBMC, self).__init__(
title_maxlen=250, body_maxlen=32768,
image_size=XBMC_IMAGE_XY,
notify_format=NotifyFormat.TEXT,
**kwargs)
image_size=XBMC_IMAGE_XY, **kwargs)
if self.secure:
self.schema = 'https'
else:
self.schema = 'http'
if not self.port:
self.port = XBMC_PORT
self.port = self.default_port
self.protocol = kwargs.get('protocol', XBMC_PROTOCOL_V2)
if self.protocol not in SUPPORTED_XBMC_PROTOCOLS:
@ -152,11 +154,17 @@ class NotifyXBMC(NotifyBase):
return (headers, dumps(payload))
def _notify(self, title, body, notify_type, **kwargs):
def notify(self, title, body, notify_type, **kwargs):
"""
Perform XBMC Notification
"""
# Limit results to just the first 2 line otherwise
# there is just to much content to display
body = re.split('[\r\n]+', body)
body[0] = body[0].strip('#').strip()
body = '\r\n'.join(body[0:2])
if self.protocol == XBMC_PROTOCOL_V2:
# XBMC v2.0
(headers, payload) = self._payload_20(
@ -205,6 +213,7 @@ class NotifyXBMC(NotifyBase):
# Return; we're done
return False
else:
self.logger.info('Sent XBMC/KODI notification.')

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# XML Notify Wrapper
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -19,14 +19,13 @@
# You should have received a copy of the GNU General Public License
# along with apprise. If not, see <http://www.gnu.org/licenses/>.
from urllib import quote
import requests
import re
import requests
from urllib import quote
from .NotifyBase import NotifyBase
from .NotifyBase import NotifyFormat
from .NotifyBase import NotifyImageSize
from .NotifyBase import HTTP_ERROR_MAP
from ..common import NotifyImageSize
# Image Support (128x128)
XML_IMAGE_XY = NotifyImageSize.XY_128
@ -38,10 +37,10 @@ class NotifyXML(NotifyBase):
"""
# The default protocol
PROTOCOL = 'xml'
protocol = 'xml'
# The default secure protocol
SECURE_PROTOCOL = 'xmls'
secure_protocol = 'xmls'
def __init__(self, **kwargs):
"""
@ -49,9 +48,7 @@ class NotifyXML(NotifyBase):
"""
super(NotifyXML, self).__init__(
title_maxlen=250, body_maxlen=32768,
image_size=XML_IMAGE_XY,
notify_format=NotifyFormat.TEXT,
**kwargs)
image_size=XML_IMAGE_XY, **kwargs)
self.payload = """<?xml version='1.0' encoding='utf-8'?>
<soapenv:Envelope
@ -80,7 +77,7 @@ class NotifyXML(NotifyBase):
return
def _notify(self, title, body, notify_type, **kwargs):
def notify(self, title, body, notify_type, **kwargs):
"""
Perform XML Notification
"""

View File

@ -1,8 +1,8 @@
# -*- encoding: utf-8 -*-
# -*- coding: utf-8 -*-
#
# Our service wrappers
#
# Copyright (C) 2014-2017 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2017 Chris Caron <lead2gold@gmail.com>
#
# This file is part of apprise.
#
@ -40,11 +40,19 @@ from .NotifyTelegram import NotifyTelegram
from .NotifyMatterMost import NotifyMatterMost
from .NotifyPushjet import NotifyPushjet
from ..common import NotifyImageSize
from ..common import NOTIFY_IMAGE_SIZES
from ..common import NotifyType
from ..common import NOTIFY_TYPES
__all__ = [
# Notification Services
'NotifyBoxcar', 'NotifyEmail', 'NotifyFaast', 'NotifyGrowl', 'NotifyJSON',
'NotifyMyAndroid', 'NotifyProwl', 'NotifyPushalot', 'NotifyPushBullet',
'NotifyPushover', 'NotifyRocketChat', 'NotifyToasty', 'NotifyTwitter',
'NotifyXBMC', 'NotifyXML', 'NotifySlack', 'NotifyJoin', 'NotifyTelegram',
'NotifyMatterMost', 'NotifyPushjet'
'NotifyMatterMost', 'NotifyPushjet',
# Reference
'NotifyImageSize', 'NOTIFY_IMAGE_SIZES', 'NotifyType', 'NOTIFY_TYPES',
]

View File

@ -1,26 +0,0 @@
#!/usr/bin/env python
def _main():
"""\
Usage: apprise [options] [URL ...]
Send notifications to a variety of different supported services.
See also https://github.com/caronc/apprise
URL The notification service URL
Options:
-h, --help show this message
-t TITLE, --title TITLE Specify a notification title.
-b BODY, --body BODY Specify a notification body.
-i IMGURL, --image IMGURL Specify an image to send with the notification.
The image should be in the format of a URL
string such as file:///local/path/to/file.png or
a remote site like: http://my.host/my.image.png.
"""
if __name__ == '__main__':
_main()

75
cli/notify.py Executable file
View File

@ -0,0 +1,75 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import click
import logging
import sys
from apprise import Apprise
# Logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
@click.command()
@click.option('--title', '-t', default=None, type=str,
help='Specify the message title.')
@click.option('--body', '-b', default=None, type=str,
help='Specify the message body.')
@click.option('--theme', '-T', default='default', type=str,
help='Specify the default theme.')
@click.option('--image-url', '-i', default=None, type=str,
help='Specify the image URL.')
@click.argument('urls', nargs=-1)
def _main(title, body, urls, theme, image_url):
"""
Notify all specified servers
"""
if not (title and body):
logger.error('Neither a message body or title was specified.')
return 1
if not urls:
logger.error('You must specify at least one server URL')
return 1
# Create our object
apprise = Apprise()
# Load our inventory up
for url in urls:
apprise.add(url)
# now print it out
apprise.notify(title=title, body=body)
return 0
# """\
# Usage: apprise [options] [URL ...]
#
# Send notifications to a variety of different supported services.
# See also https://github.com/caronc/apprise
#
# URL The notification service URL
#
# Options:
#
# -h, --help show this message
# -t TITLE, --title TITLE Specify a notification title.
# -b BODY, --body BODY Specify a notification body.
# -i IMGURL, --image IMGURL Specify an image to send with the notification.
# The image should be in the format of a URL
# string such as file:///local/path/to/file.png or
# a remote site like: http://my.host/my.image.png.
# """
if __name__ == '__main__':
exit(_main())

View File

@ -6,4 +6,4 @@ requests-oauthlib
oauthlib
urllib3
six
click
click >= 5.0

View File

@ -2,3 +2,13 @@
tag_build =
tag_date = 0
tag_svn_revision = 0
[pycodestyle]
# We exclude packages we don't maintain
exclude = gntp,tweepy,pushjet
ignore = E722
statistics = True
[coverage:run]
source=apprise
omit=*/gntp/*,*/tweepy/*,*/pushjet/*

View File

@ -19,6 +19,7 @@
import os
try:
from setuptools import setup
except ImportError:
from distutils.core import setup
@ -44,11 +45,11 @@ setup(
author='Chris Caron',
author_email='lead2gold@gmail.com',
packages=find_packages(),
package_data={
'apprise': ['var/*'],
},
include_package_data=True,
scripts=['bin/apprise.py', ],
package_data={
'apprise': ['assets'],
},
scripts=['cli/notify.py', ],
install_requires=open('requirements.txt').readlines(),
classifiers=(
'Development Status :: 4 - Beta',
@ -61,4 +62,6 @@ setup(
),
entry_points={'console_scripts': console_scripts},
python_requires='>=2.7, <3',
test_suite='nose.collector',
tests_require=['nose', 'coverage', 'pycodestyle'],
)

230
test/test_utils.py Normal file
View File

@ -0,0 +1,230 @@
"""API properties.
"""
from __future__ import print_function
from __future__ import unicode_literals
from urllib import unquote
from apprise import utils
def test_parse_url():
"utils: parse_url() testing """
result = utils.parse_url('http://hostname')
assert(result['schema'] == 'http')
assert(result['host'] == 'hostname')
assert(result['port'] is None)
assert(result['user'] is None)
assert(result['password'] is None)
assert(result['fullpath'] is None)
assert(result['path'] is None)
assert(result['query'] is None)
assert(result['url'] == 'http://hostname')
assert(result['qsd'] == {})
result = utils.parse_url('http://hostname/')
assert(result['schema'] == 'http')
assert(result['host'] == 'hostname')
assert(result['port'] is None)
assert(result['user'] is None)
assert(result['password'] is None)
assert(result['fullpath'] == '/')
assert(result['path'] == '/')
assert(result['query'] is None)
assert(result['url'] == 'http://hostname/')
assert(result['qsd'] == {})
result = utils.parse_url('hostname')
assert(result['schema'] == 'http')
assert(result['host'] == 'hostname')
assert(result['port'] is None)
assert(result['user'] is None)
assert(result['password'] is None)
assert(result['fullpath'] is None)
assert(result['path'] is None)
assert(result['query'] is None)
assert(result['url'] == 'http://hostname')
assert(result['qsd'] == {})
result = utils.parse_url('http://hostname////')
assert(result['schema'] == 'http')
assert(result['host'] == 'hostname')
assert(result['port'] is None)
assert(result['user'] is None)
assert(result['password'] is None)
assert(result['fullpath'] == '/')
assert(result['path'] == '/')
assert(result['query'] is None)
assert(result['url'] == 'http://hostname/')
assert(result['qsd'] == {})
result = utils.parse_url('http://hostname:40////')
assert(result['schema'] == 'http')
assert(result['host'] == 'hostname')
assert(result['port'] == 40)
assert(result['user'] is None)
assert(result['password'] is None)
assert(result['fullpath'] == '/')
assert(result['path'] == '/')
assert(result['query'] is None)
assert(result['url'] == 'http://hostname:40/')
assert(result['qsd'] == {})
result = utils.parse_url('HTTP://HoStNaMe:40/test.php')
assert(result['schema'] == 'http')
assert(result['host'] == 'HoStNaMe')
assert(result['port'] == 40)
assert(result['user'] is None)
assert(result['password'] is None)
assert(result['fullpath'] == '/test.php')
assert(result['path'] == '/')
assert(result['query'] == 'test.php')
assert(result['url'] == 'http://HoStNaMe:40/test.php')
assert(result['qsd'] == {})
result = utils.parse_url('HTTPS://user@hostname/test.py')
assert(result['schema'] == 'https')
assert(result['host'] == 'hostname')
assert(result['port'] is None)
assert(result['user'] == 'user')
assert(result['password'] is None)
assert(result['fullpath'] == '/test.py')
assert(result['path'] == '/')
assert(result['query'] == 'test.py')
assert(result['url'] == 'https://user@hostname/test.py')
assert(result['qsd'] == {})
result = utils.parse_url(' HTTPS://///user@@@hostname///test.py ')
assert(result['schema'] == 'https')
assert(result['host'] == 'hostname')
assert(result['port'] is None)
assert(result['user'] == 'user')
assert(result['password'] is None)
assert(result['fullpath'] == '/test.py')
assert(result['path'] == '/')
assert(result['query'] == 'test.py')
assert(result['url'] == 'https://user@hostname/test.py')
assert(result['qsd'] == {})
result = utils.parse_url(
'HTTPS://user:password@otherHost/full///path/name/',
)
assert(result['schema'] == 'https')
assert(result['host'] == 'otherHost')
assert(result['port'] is None)
assert(result['user'] == 'user')
assert(result['password'] == 'password')
assert(result['fullpath'] == '/full/path/name/')
assert(result['path'] == '/full/path/name/')
assert(result['query'] is None)
assert(result['url'] == 'https://user:password@otherHost/full/path/name/')
assert(result['qsd'] == {})
# Handle garbage
assert(utils.parse_url(None) is None)
result = utils.parse_url(
'mailto://user:password@otherHost/lead2gold@gmail.com' +
'?from=test@test.com&name=Chris%20Caron&format=text'
)
assert(result['schema'] == 'mailto')
assert(result['host'] == 'otherHost')
assert(result['port'] is None)
assert(result['user'] == 'user')
assert(result['password'] == 'password')
assert(unquote(result['fullpath']) == '/lead2gold@gmail.com')
assert(result['path'] == '/')
assert(unquote(result['query']) == 'lead2gold@gmail.com')
assert(unquote(
result['url']) ==
'mailto://user:password@otherHost/lead2gold@gmail.com')
assert(len(result['qsd']) == 3)
assert('name' in result['qsd'])
assert(unquote(result['qsd']['name']) == 'Chris Caron')
assert('from' in result['qsd'])
assert(unquote(result['qsd']['from']) == 'test@test.com')
assert('format' in result['qsd'])
assert(unquote(result['qsd']['format']) == 'text')
# Test Passwords with question marks ?; not supported
result = utils.parse_url(
'http://user:pass.with.?question@host'
)
assert(result is None)
def test_parse_bool():
"utils: parse_bool() testing """
assert(utils.parse_bool('Enabled', None) is True)
assert(utils.parse_bool('Disabled', None) is False)
assert(utils.parse_bool('Allow', None) is True)
assert(utils.parse_bool('Deny', None) is False)
assert(utils.parse_bool('Yes', None) is True)
assert(utils.parse_bool('YES', None) is True)
assert(utils.parse_bool('Always', None) is True)
assert(utils.parse_bool('No', None) is False)
assert(utils.parse_bool('NO', None) is False)
assert(utils.parse_bool('NEVER', None) is False)
assert(utils.parse_bool('TrUE', None) is True)
assert(utils.parse_bool('tRUe', None) is True)
assert(utils.parse_bool('FAlse', None) is False)
assert(utils.parse_bool('F', None) is False)
assert(utils.parse_bool('T', None) is True)
assert(utils.parse_bool('0', None) is False)
assert(utils.parse_bool('1', None) is True)
assert(utils.parse_bool('True', None) is True)
assert(utils.parse_bool('Yes', None) is True)
assert(utils.parse_bool(1, None) is True)
assert(utils.parse_bool(0, None) is False)
assert(utils.parse_bool(True, None) is True)
assert(utils.parse_bool(False, None) is False)
# only the int of 0 will return False since the function
# casts this to a boolean
assert(utils.parse_bool(2, None) is True)
# An empty list is still false
assert(utils.parse_bool([], None) is False)
# But a list that contains something is True
assert(utils.parse_bool(['value', ], None) is True)
# Use Default (which is False)
assert(utils.parse_bool('OhYeah') is False)
# Adjust Default and get a different result
assert(utils.parse_bool('OhYeah', True) is True)
def test_parse_list():
"utils: parse_list() testing """
# A simple single array entry (As str)
results = utils.parse_list(
'.mkv,.avi,.divx,.xvid,.mov,.wmv,.mp4,.mpg,.mpeg,.vob,.iso')
assert(results == [
'.divx', '.iso', '.mkv', '.mov', '.mpg', '.avi', '.mpeg', '.vob',
'.xvid', '.wmv', '.mp4',
])
# Now 2 lists with lots of duplicates and other delimiters
results = utils.parse_list(
'.mkv,.avi,.divx,.xvid,.mov,.wmv,.mp4,.mpg .mpeg,.vob,,; ;',
'.mkv,.avi,.divx,.xvid,.mov .wmv,.mp4;.mpg,.mpeg,'
'.vob,.iso')
assert(results == [
'.divx', '.iso', '.mkv', '.mov', '.mpg', '.avi', '.mpeg', '.vob',
'.xvid', '.wmv', '.mp4',
])
# Now a list with extras we want to add as strings
# empty entries are removed
results = utils.parse_list([
'.divx', '.iso', '.mkv', '.mov', '', ' ', '.avi', '.mpeg', '.vob',
'.xvid', '.mp4'], '.mov,.wmv,.mp4,.mpg')
assert(results == [
'.divx', '.wmv', '.iso', '.mkv', '.mov', '.mpg', '.avi', '.vob',
'.xvid', '.mpeg', '.mp4',
])