mirror of
https://github.com/caronc/apprise.git
synced 2025-01-31 18:39:16 +01:00
refactored how +/- prefixed url arguments are handled
This commit is contained in:
parent
cdd72086ee
commit
b1133be85b
@ -176,7 +176,6 @@ class NotifyBase(object):
|
|||||||
|
|
||||||
self.user = kwargs.get('user')
|
self.user = kwargs.get('user')
|
||||||
self.password = kwargs.get('password')
|
self.password = kwargs.get('password')
|
||||||
self.headers = kwargs.get('headers')
|
|
||||||
|
|
||||||
if 'format' in kwargs:
|
if 'format' in kwargs:
|
||||||
# Store the specified format if specified
|
# Store the specified format if specified
|
||||||
@ -536,6 +535,4 @@ class NotifyBase(object):
|
|||||||
if 'user' in results['qsd']:
|
if 'user' in results['qsd']:
|
||||||
results['user'] = results['qsd']['user']
|
results['user'] = results['qsd']['user']
|
||||||
|
|
||||||
results['headers'] = {k[1:]: v for k, v in results['qsd'].items()
|
|
||||||
if re.match(r'^-.', k)}
|
|
||||||
return results
|
return results
|
||||||
|
@ -91,10 +91,19 @@ class NotifyIFTTT(NotifyBase):
|
|||||||
notify_url = 'https://maker.ifttt.com/' \
|
notify_url = 'https://maker.ifttt.com/' \
|
||||||
'trigger/{event}/with/key/{webhook_id}'
|
'trigger/{event}/with/key/{webhook_id}'
|
||||||
|
|
||||||
def __init__(self, webhook_id, events, **kwargs):
|
def __init__(self, webhook_id, events, add_tokens=None, del_tokens=None,
|
||||||
|
**kwargs):
|
||||||
"""
|
"""
|
||||||
Initialize IFTTT Object
|
Initialize IFTTT Object
|
||||||
|
|
||||||
|
add_tokens can optionally be a dictionary of key/value pairs
|
||||||
|
that you want to include in the IFTTT post to the server.
|
||||||
|
|
||||||
|
del_tokens can optionally be a list/tuple/set of tokens
|
||||||
|
that you want to eliminate from the IFTTT post. There isn't
|
||||||
|
much real functionality to this one unless you want to remove
|
||||||
|
reference to Value1, Value2, and/or Value3
|
||||||
|
|
||||||
"""
|
"""
|
||||||
super(NotifyIFTTT, self).__init__(**kwargs)
|
super(NotifyIFTTT, self).__init__(**kwargs)
|
||||||
|
|
||||||
@ -111,6 +120,22 @@ class NotifyIFTTT(NotifyBase):
|
|||||||
# Store our APIKey
|
# Store our APIKey
|
||||||
self.webhook_id = webhook_id
|
self.webhook_id = webhook_id
|
||||||
|
|
||||||
|
# Tokens to include in post
|
||||||
|
self.add_tokens = {}
|
||||||
|
if add_tokens:
|
||||||
|
self.add_tokens.update(add_tokens)
|
||||||
|
|
||||||
|
# Tokens to remove
|
||||||
|
self.del_tokens = []
|
||||||
|
if del_tokens is not None:
|
||||||
|
if isinstance(del_tokens, (list, tuple, set)):
|
||||||
|
self.del_tokens = del_tokens
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise TypeError(
|
||||||
|
'del_token must be a list; {} was provided'.format(
|
||||||
|
str(type(del_tokens))))
|
||||||
|
|
||||||
def notify(self, title, body, notify_type, **kwargs):
|
def notify(self, title, body, notify_type, **kwargs):
|
||||||
"""
|
"""
|
||||||
Perform IFTTT Notification
|
Perform IFTTT Notification
|
||||||
@ -128,10 +153,13 @@ class NotifyIFTTT(NotifyBase):
|
|||||||
self.ifttt_default_type_key: notify_type,
|
self.ifttt_default_type_key: notify_type,
|
||||||
}
|
}
|
||||||
|
|
||||||
# Eliminate empty fields; users wishing to cancel the use of the
|
# Add any new tokens expected (this can also potentially override
|
||||||
# self.ifttt_default_ entries can preset these keys to being
|
# any entries defined above)
|
||||||
# empty so that they get caught here and removed.
|
payload.update(self.add_tokens)
|
||||||
payload = {x: y for x, y in payload.items() if y}
|
|
||||||
|
# Eliminate fields flagged for removal
|
||||||
|
payload = {x: y for x, y in payload.items()
|
||||||
|
if x not in self.del_tokens}
|
||||||
|
|
||||||
# Track our failures
|
# Track our failures
|
||||||
error_count = 0
|
error_count = 0
|
||||||
@ -217,6 +245,10 @@ class NotifyIFTTT(NotifyBase):
|
|||||||
'overflow': self.overflow_mode,
|
'overflow': self.overflow_mode,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Store any new key/value pairs added to our list
|
||||||
|
args.update({'+{}'.format(k): v for k, v in self.add_tokens})
|
||||||
|
args.update({'-{}'.format(k): '' for k in self.del_tokens})
|
||||||
|
|
||||||
return '{schema}://{webhook_id}@{events}/?{args}'.format(
|
return '{schema}://{webhook_id}@{events}/?{args}'.format(
|
||||||
schema=self.secure_protocol,
|
schema=self.secure_protocol,
|
||||||
webhook_id=self.webhook_id,
|
webhook_id=self.webhook_id,
|
||||||
|
@ -52,9 +52,13 @@ class NotifyJSON(NotifyBase):
|
|||||||
# Allows the user to specify the NotifyImageSize object
|
# Allows the user to specify the NotifyImageSize object
|
||||||
image_size = NotifyImageSize.XY_128
|
image_size = NotifyImageSize.XY_128
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, headers, **kwargs):
|
||||||
"""
|
"""
|
||||||
Initialize JSON Object
|
Initialize JSON Object
|
||||||
|
|
||||||
|
headers can be a dictionary of key/value pairs that you want to
|
||||||
|
additionally include as part of the server headers to post with
|
||||||
|
|
||||||
"""
|
"""
|
||||||
super(NotifyJSON, self).__init__(**kwargs)
|
super(NotifyJSON, self).__init__(**kwargs)
|
||||||
|
|
||||||
@ -68,6 +72,11 @@ class NotifyJSON(NotifyBase):
|
|||||||
if not compat_is_basestring(self.fullpath):
|
if not compat_is_basestring(self.fullpath):
|
||||||
self.fullpath = '/'
|
self.fullpath = '/'
|
||||||
|
|
||||||
|
self.headers = {}
|
||||||
|
if headers:
|
||||||
|
# Store our extra headers
|
||||||
|
self.headers.update(headers)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
def url(self):
|
def url(self):
|
||||||
@ -81,6 +90,9 @@ class NotifyJSON(NotifyBase):
|
|||||||
'overflow': self.overflow_mode,
|
'overflow': self.overflow_mode,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Append our headers into our args
|
||||||
|
args.update({'+{}'.format(k): v for k, v in self.headers.items()})
|
||||||
|
|
||||||
# Determine Authentication
|
# Determine Authentication
|
||||||
auth = ''
|
auth = ''
|
||||||
if self.user and self.password:
|
if self.user and self.password:
|
||||||
@ -125,8 +137,8 @@ class NotifyJSON(NotifyBase):
|
|||||||
'Content-Type': 'application/json'
|
'Content-Type': 'application/json'
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.headers:
|
# Apply any/all header over-rides defined
|
||||||
headers.update(self.headers)
|
headers.update(self.headers)
|
||||||
|
|
||||||
auth = None
|
auth = None
|
||||||
if self.user:
|
if self.user:
|
||||||
@ -179,3 +191,23 @@ class NotifyJSON(NotifyBase):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_url(url):
|
||||||
|
"""
|
||||||
|
Parses the URL and returns enough arguments that can allow
|
||||||
|
us to substantiate this object.
|
||||||
|
|
||||||
|
"""
|
||||||
|
results = NotifyBase.parse_url(url)
|
||||||
|
|
||||||
|
if not results:
|
||||||
|
# We're done early as we couldn't load the results
|
||||||
|
return results
|
||||||
|
|
||||||
|
# Add our headers that the user can potentially over-ride if they wish
|
||||||
|
# to to our returned result set
|
||||||
|
results['headers'] = results['qsd-']
|
||||||
|
results['headers'].update(results['qsd+'])
|
||||||
|
|
||||||
|
return results
|
||||||
|
@ -52,9 +52,13 @@ class NotifyXML(NotifyBase):
|
|||||||
# Allows the user to specify the NotifyImageSize object
|
# Allows the user to specify the NotifyImageSize object
|
||||||
image_size = NotifyImageSize.XY_128
|
image_size = NotifyImageSize.XY_128
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, headers=None, **kwargs):
|
||||||
"""
|
"""
|
||||||
Initialize XML Object
|
Initialize XML Object
|
||||||
|
|
||||||
|
headers can be a dictionary of key/value pairs that you want to
|
||||||
|
additionally include as part of the server headers to post with
|
||||||
|
|
||||||
"""
|
"""
|
||||||
super(NotifyXML, self).__init__(**kwargs)
|
super(NotifyXML, self).__init__(**kwargs)
|
||||||
|
|
||||||
@ -83,6 +87,11 @@ class NotifyXML(NotifyBase):
|
|||||||
if not compat_is_basestring(self.fullpath):
|
if not compat_is_basestring(self.fullpath):
|
||||||
self.fullpath = '/'
|
self.fullpath = '/'
|
||||||
|
|
||||||
|
self.headers = {}
|
||||||
|
if headers:
|
||||||
|
# Store our extra headers
|
||||||
|
self.headers.update(headers)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
def url(self):
|
def url(self):
|
||||||
@ -96,6 +105,9 @@ class NotifyXML(NotifyBase):
|
|||||||
'overflow': self.overflow_mode,
|
'overflow': self.overflow_mode,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Append our headers into our args
|
||||||
|
args.update({'+{}'.format(k): v for k, v in self.headers.items()})
|
||||||
|
|
||||||
# Determine Authentication
|
# Determine Authentication
|
||||||
auth = ''
|
auth = ''
|
||||||
if self.user and self.password:
|
if self.user and self.password:
|
||||||
@ -130,8 +142,8 @@ class NotifyXML(NotifyBase):
|
|||||||
'Content-Type': 'application/xml'
|
'Content-Type': 'application/xml'
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.headers:
|
# Apply any/all header over-rides defined
|
||||||
headers.update(self.headers)
|
headers.update(self.headers)
|
||||||
|
|
||||||
re_map = {
|
re_map = {
|
||||||
'{MESSAGE_TYPE}': NotifyBase.quote(notify_type),
|
'{MESSAGE_TYPE}': NotifyBase.quote(notify_type),
|
||||||
@ -197,3 +209,23 @@ class NotifyXML(NotifyBase):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_url(url):
|
||||||
|
"""
|
||||||
|
Parses the URL and returns enough arguments that can allow
|
||||||
|
us to substantiate this object.
|
||||||
|
|
||||||
|
"""
|
||||||
|
results = NotifyBase.parse_url(url)
|
||||||
|
|
||||||
|
if not results:
|
||||||
|
# We're done early as we couldn't load the results
|
||||||
|
return results
|
||||||
|
|
||||||
|
# Add our headers that the user can potentially over-ride if they wish
|
||||||
|
# to to our returned result set
|
||||||
|
results['headers'] = results['qsd-']
|
||||||
|
results['headers'].update(results['qsd+'])
|
||||||
|
|
||||||
|
return results
|
||||||
|
112
apprise/utils.py
112
apprise/utils.py
@ -32,14 +32,12 @@ try:
|
|||||||
from urllib import unquote
|
from urllib import unquote
|
||||||
from urllib import quote
|
from urllib import quote
|
||||||
from urlparse import urlparse
|
from urlparse import urlparse
|
||||||
from urlparse import parse_qsl
|
|
||||||
|
|
||||||
except ImportError:
|
except ImportError:
|
||||||
# Python 3.x
|
# Python 3.x
|
||||||
from urllib.parse import unquote
|
from urllib.parse import unquote
|
||||||
from urllib.parse import quote
|
from urllib.parse import quote
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
from urllib.parse import parse_qsl
|
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -91,6 +89,12 @@ TIDY_NUX_TRIM_RE = re.compile(
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# The handling of custom arguments passed in the URL; we treat any
|
||||||
|
# argument (which would otherwise appear in the qsd area of our parse_url()
|
||||||
|
# function differently if they start with a + or - value
|
||||||
|
NOTIFY_CUSTOM_ADD_TOKENS = re.compile(r'^( |\+)(?P<key>.*)\s*')
|
||||||
|
NOTIFY_CUSTOM_DEL_TOKENS = re.compile(r'^-(?P<key>.*)\s*')
|
||||||
|
|
||||||
# Used for attempting to acquire the schema if the URL can't be parsed.
|
# Used for attempting to acquire the schema if the URL can't be parsed.
|
||||||
GET_SCHEMA_RE = re.compile(r'\s*(?P<schema>[a-z0-9]{2,9})://.*$', re.I)
|
GET_SCHEMA_RE = re.compile(r'\s*(?P<schema>[a-z0-9]{2,9})://.*$', re.I)
|
||||||
|
|
||||||
@ -143,6 +147,81 @@ def tidy_path(path):
|
|||||||
return path
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
def parse_qsd(qs):
|
||||||
|
"""
|
||||||
|
Query String Dictionary Builder
|
||||||
|
|
||||||
|
A custom implimentation of the parse_qsl() function already provided
|
||||||
|
by Python. This function is slightly more light weight and gives us
|
||||||
|
more control over parsing out arguments such as the plus/+ symbol
|
||||||
|
at the head of a key/value pair.
|
||||||
|
|
||||||
|
qs should be a query string part made up as part of the URL such as
|
||||||
|
a=1&c=2&d=
|
||||||
|
|
||||||
|
a=1 gets interpreted as { 'a': '1' }
|
||||||
|
a= gets interpreted as { 'a': '' }
|
||||||
|
a gets interpreted as { 'a': '' }
|
||||||
|
|
||||||
|
|
||||||
|
This function returns a result object that fits with the apprise
|
||||||
|
expected parameters (populating the 'qsd' portion of the dictionary
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Our return result set:
|
||||||
|
result = {
|
||||||
|
# The arguments passed in (the parsed query). This is in a dictionary
|
||||||
|
# of {'key': 'val', etc }. Keys are all made lowercase before storing
|
||||||
|
# to simplify access to them.
|
||||||
|
'qsd': {},
|
||||||
|
|
||||||
|
# Detected Entries that start with + or - are additionally stored in
|
||||||
|
# these values (un-touched). The +/- however are stripped from their
|
||||||
|
# name before they are stored here.
|
||||||
|
'qsd+': {},
|
||||||
|
'qsd-': {},
|
||||||
|
}
|
||||||
|
|
||||||
|
pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')]
|
||||||
|
for name_value in pairs:
|
||||||
|
nv = name_value.split('=', 1)
|
||||||
|
# Handle case of a control-name with no equal sign
|
||||||
|
if len(nv) != 2:
|
||||||
|
nv.append('')
|
||||||
|
|
||||||
|
# Apprise keys can start with a + symbol; so we need to skip over
|
||||||
|
# the very first entry
|
||||||
|
key = '{}{}'.format(
|
||||||
|
'' if len(nv[0]) == 0 else nv[0][0],
|
||||||
|
'' if len(nv[0]) <= 1 else nv[0][1:].replace('+', ' '),
|
||||||
|
)
|
||||||
|
|
||||||
|
key = unquote(key)
|
||||||
|
key = '' if not key else key
|
||||||
|
|
||||||
|
val = nv[1].replace('+', ' ')
|
||||||
|
val = unquote(val)
|
||||||
|
val = '' if not val else val.strip()
|
||||||
|
|
||||||
|
# Always Query String Dictionary (qsd) for every entry we have
|
||||||
|
# content is always made lowercase for easy indexing
|
||||||
|
result['qsd'][key.lower().strip()] = val
|
||||||
|
|
||||||
|
# Check for tokens that start with a addition/plus symbol (+)
|
||||||
|
k = NOTIFY_CUSTOM_ADD_TOKENS.match(key)
|
||||||
|
if k is not None:
|
||||||
|
# Store content 'as-is'
|
||||||
|
result['qsd+'][k.group('key')] = val
|
||||||
|
|
||||||
|
# Check for tokens that start with a subtraction/hyphen symbol (-)
|
||||||
|
k = NOTIFY_CUSTOM_DEL_TOKENS.match(key)
|
||||||
|
if k is not None:
|
||||||
|
# Store content 'as-is'
|
||||||
|
result['qsd-'][k.group('key')] = val
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
def parse_url(url, default_schema='http', verify_host=True):
|
def parse_url(url, default_schema='http', verify_host=True):
|
||||||
"""A function that greatly simplifies the parsing of a url
|
"""A function that greatly simplifies the parsing of a url
|
||||||
specified by the end user.
|
specified by the end user.
|
||||||
@ -190,10 +269,17 @@ def parse_url(url, default_schema='http', verify_host=True):
|
|||||||
'schema': None,
|
'schema': None,
|
||||||
# The schema
|
# The schema
|
||||||
'url': None,
|
'url': None,
|
||||||
# The arguments passed in (the parsed query)
|
# The arguments passed in (the parsed query). This is in a dictionary
|
||||||
# This is in a dictionary of {'key': 'val', etc }
|
# of {'key': 'val', etc }. Keys are all made lowercase before storing
|
||||||
|
# to simplify access to them.
|
||||||
# qsd = Query String Dictionary
|
# qsd = Query String Dictionary
|
||||||
'qsd': {}
|
'qsd': {},
|
||||||
|
|
||||||
|
# Detected Entries that start with + or - are additionally stored in
|
||||||
|
# these values (un-touched). The +/- however are stripped from their
|
||||||
|
# name before they are stored here.
|
||||||
|
'qsd+': {},
|
||||||
|
'qsd-': {},
|
||||||
}
|
}
|
||||||
|
|
||||||
qsdata = ''
|
qsdata = ''
|
||||||
@ -220,6 +306,11 @@ def parse_url(url, default_schema='http', verify_host=True):
|
|||||||
# No qsdata
|
# No qsdata
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# Parse Query Arugments ?val=key&key=val
|
||||||
|
# while ensuring that all keys are lowercase
|
||||||
|
if qsdata:
|
||||||
|
result.update(parse_qsd(qsdata))
|
||||||
|
|
||||||
# Now do a proper extraction of data
|
# Now do a proper extraction of data
|
||||||
parsed = urlparse('http://%s' % host)
|
parsed = urlparse('http://%s' % host)
|
||||||
|
|
||||||
@ -231,6 +322,7 @@ def parse_url(url, default_schema='http', verify_host=True):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
result['fullpath'] = quote(unquote(tidy_path(parsed[2].strip())))
|
result['fullpath'] = quote(unquote(tidy_path(parsed[2].strip())))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Handle trailing slashes removed by tidy_path
|
# Handle trailing slashes removed by tidy_path
|
||||||
if result['fullpath'][-1] not in ('/', '\\') and \
|
if result['fullpath'][-1] not in ('/', '\\') and \
|
||||||
@ -242,16 +334,6 @@ def parse_url(url, default_schema='http', verify_host=True):
|
|||||||
# and therefore, no trailing slash
|
# and therefore, no trailing slash
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# Parse Query Arugments ?val=key&key=val
|
|
||||||
# while ensureing that all keys are lowercase
|
|
||||||
if qsdata:
|
|
||||||
result['qsd'] = dict([(k.lower().strip(), v.strip())
|
|
||||||
for k, v in parse_qsl(
|
|
||||||
qsdata,
|
|
||||||
keep_blank_values=True,
|
|
||||||
strict_parsing=False,
|
|
||||||
)])
|
|
||||||
|
|
||||||
if not result['fullpath']:
|
if not result['fullpath']:
|
||||||
# Default
|
# Default
|
||||||
result['fullpath'] = None
|
result['fullpath'] = None
|
||||||
|
@ -176,12 +176,6 @@ def test_notify_base_urls():
|
|||||||
assert 'password' in results
|
assert 'password' in results
|
||||||
assert results['password'] == "newpassword"
|
assert results['password'] == "newpassword"
|
||||||
|
|
||||||
# pass headers
|
|
||||||
results = NotifyBase.parse_url(
|
|
||||||
'https://localhost:8080?-HeaderKey=HeaderValue')
|
|
||||||
assert 'headerkey' in results['headers']
|
|
||||||
assert results['headers']['headerkey'] == 'HeaderValue'
|
|
||||||
|
|
||||||
# User Handling
|
# User Handling
|
||||||
|
|
||||||
# user keyword over-rides default password
|
# user keyword over-rides default password
|
||||||
|
@ -272,6 +272,14 @@ TEST_URLS = (
|
|||||||
('ifttt://:@/', {
|
('ifttt://:@/', {
|
||||||
'instance': None,
|
'instance': None,
|
||||||
}),
|
}),
|
||||||
|
# A nicely formed ifttt url with 1 event and a new key/value store
|
||||||
|
('ifttt://WebHookID@EventID/?+TemplateKey=TemplateVal', {
|
||||||
|
'instance': plugins.NotifyIFTTT,
|
||||||
|
}),
|
||||||
|
# Removing certain keys:
|
||||||
|
('ifttt://WebHookID@EventID/?-Value1=&-Value2', {
|
||||||
|
'instance': plugins.NotifyIFTTT,
|
||||||
|
}),
|
||||||
# A nicely formed ifttt url with 2 events defined:
|
# A nicely formed ifttt url with 2 events defined:
|
||||||
('ifttt://WebHookID@EventID/EventID2/', {
|
('ifttt://WebHookID@EventID/EventID2/', {
|
||||||
'instance': plugins.NotifyIFTTT,
|
'instance': plugins.NotifyIFTTT,
|
||||||
@ -2236,6 +2244,52 @@ def test_notify_ifttt_plugin(mock_post, mock_get):
|
|||||||
notify_type=NotifyType.INFO) is True
|
notify_type=NotifyType.INFO) is True
|
||||||
|
|
||||||
|
|
||||||
|
# Test the addition of tokens
|
||||||
|
obj = plugins.NotifyIFTTT(
|
||||||
|
webhook_id=webhook_id, events=events,
|
||||||
|
add_tokens={'Test':'ValueA', 'Test2': 'ValueB'})
|
||||||
|
|
||||||
|
assert(isinstance(obj, plugins.NotifyIFTTT))
|
||||||
|
|
||||||
|
assert obj.notify(title='title', body='body',
|
||||||
|
notify_type=NotifyType.INFO) is True
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Invalid del_tokens entry
|
||||||
|
obj = plugins.NotifyIFTTT(
|
||||||
|
webhook_id=webhook_id, events=events,
|
||||||
|
del_tokens=plugins.NotifyIFTTT.ifttt_default_title_key)
|
||||||
|
|
||||||
|
# we shouldn't reach here
|
||||||
|
assert False
|
||||||
|
|
||||||
|
except TypeError:
|
||||||
|
# del_tokens must be a list, so passing a string will throw
|
||||||
|
# an exception.
|
||||||
|
assert True
|
||||||
|
|
||||||
|
assert(isinstance(obj, plugins.NotifyIFTTT))
|
||||||
|
|
||||||
|
assert obj.notify(title='title', body='body',
|
||||||
|
notify_type=NotifyType.INFO) is True
|
||||||
|
|
||||||
|
# Test removal of tokens by a list
|
||||||
|
obj = plugins.NotifyIFTTT(
|
||||||
|
webhook_id=webhook_id, events=events,
|
||||||
|
add_tokens={
|
||||||
|
'MyKey': 'MyValue'
|
||||||
|
},
|
||||||
|
del_tokens=(
|
||||||
|
plugins.NotifyIFTTT.ifttt_default_title_key,
|
||||||
|
plugins.NotifyIFTTT.ifttt_default_body_key,
|
||||||
|
plugins.NotifyIFTTT.ifttt_default_type_key,
|
||||||
|
))
|
||||||
|
|
||||||
|
assert(isinstance(obj, plugins.NotifyIFTTT))
|
||||||
|
|
||||||
|
assert obj.notify(title='title', body='body',
|
||||||
|
notify_type=NotifyType.INFO) is True
|
||||||
|
|
||||||
@mock.patch('requests.get')
|
@mock.patch('requests.get')
|
||||||
@mock.patch('requests.post')
|
@mock.patch('requests.post')
|
||||||
def test_notify_join_plugin(mock_post, mock_get):
|
def test_notify_join_plugin(mock_post, mock_get):
|
||||||
@ -3150,7 +3204,8 @@ def test_notify_overflow_split():
|
|||||||
title_maxlen = title_len
|
title_maxlen = title_len
|
||||||
|
|
||||||
# Enforce a body length
|
# Enforce a body length
|
||||||
body_maxlen = (body_len / 4)
|
# Wrap in int() so Python v3 doesn't convert the response into a float
|
||||||
|
body_maxlen = int(body_len / 4)
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super(TestNotification, self).__init__(**kwargs)
|
super(TestNotification, self).__init__(**kwargs)
|
||||||
@ -3186,8 +3241,9 @@ def test_notify_overflow_split():
|
|||||||
# Enforce no title
|
# Enforce no title
|
||||||
title_maxlen = 0
|
title_maxlen = 0
|
||||||
|
|
||||||
# Enforce a body length
|
# Enforce a body length based on the title
|
||||||
body_maxlen = (title_len / 4)
|
# Wrap in int() so Python v3 doesn't convert the response into a float
|
||||||
|
body_maxlen = int(title_len / 4)
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super(TestNotification, self).__init__(**kwargs)
|
super(TestNotification, self).__init__(**kwargs)
|
||||||
@ -3214,7 +3270,9 @@ def test_notify_overflow_split():
|
|||||||
|
|
||||||
# Due to the new line added to the end
|
# Due to the new line added to the end
|
||||||
assert len(chunks) == (
|
assert len(chunks) == (
|
||||||
(len(bulk) / TestNotification.body_maxlen) +
|
# wrap division in int() so Python 3 doesn't convert it to a float on
|
||||||
|
# us
|
||||||
|
int(len(bulk) / TestNotification.body_maxlen) +
|
||||||
(1 if len(bulk) % TestNotification.body_maxlen else 0))
|
(1 if len(bulk) % TestNotification.body_maxlen else 0))
|
||||||
|
|
||||||
for chunk in chunks:
|
for chunk in chunks:
|
||||||
|
@ -35,6 +35,26 @@ except ImportError:
|
|||||||
from apprise import utils
|
from apprise import utils
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_qsd():
|
||||||
|
"utils: parse_qsd() testing """
|
||||||
|
|
||||||
|
result = utils.parse_qsd('a=1&b=&c&d=abcd')
|
||||||
|
assert(isinstance(result, dict) is True)
|
||||||
|
assert(len(result) == 3)
|
||||||
|
assert 'qsd' in result
|
||||||
|
assert 'qsd+' in result
|
||||||
|
assert 'qsd-' in result
|
||||||
|
|
||||||
|
assert(len(result['qsd']) == 4)
|
||||||
|
assert 'a' in result['qsd']
|
||||||
|
assert 'b' in result['qsd']
|
||||||
|
assert 'c' in result['qsd']
|
||||||
|
assert 'd' in result['qsd']
|
||||||
|
|
||||||
|
assert(len(result['qsd-']) == 0)
|
||||||
|
assert(len(result['qsd+']) == 0)
|
||||||
|
|
||||||
|
|
||||||
def test_parse_url():
|
def test_parse_url():
|
||||||
"utils: parse_url() testing """
|
"utils: parse_url() testing """
|
||||||
|
|
||||||
@ -49,6 +69,8 @@ def test_parse_url():
|
|||||||
assert(result['query'] is None)
|
assert(result['query'] is None)
|
||||||
assert(result['url'] == 'http://hostname')
|
assert(result['url'] == 'http://hostname')
|
||||||
assert(result['qsd'] == {})
|
assert(result['qsd'] == {})
|
||||||
|
assert(result['qsd-'] == {})
|
||||||
|
assert(result['qsd+'] == {})
|
||||||
|
|
||||||
result = utils.parse_url('http://hostname/')
|
result = utils.parse_url('http://hostname/')
|
||||||
assert(result['schema'] == 'http')
|
assert(result['schema'] == 'http')
|
||||||
@ -61,6 +83,8 @@ def test_parse_url():
|
|||||||
assert(result['query'] is None)
|
assert(result['query'] is None)
|
||||||
assert(result['url'] == 'http://hostname/')
|
assert(result['url'] == 'http://hostname/')
|
||||||
assert(result['qsd'] == {})
|
assert(result['qsd'] == {})
|
||||||
|
assert(result['qsd-'] == {})
|
||||||
|
assert(result['qsd+'] == {})
|
||||||
|
|
||||||
result = utils.parse_url('hostname')
|
result = utils.parse_url('hostname')
|
||||||
assert(result['schema'] == 'http')
|
assert(result['schema'] == 'http')
|
||||||
@ -73,6 +97,61 @@ def test_parse_url():
|
|||||||
assert(result['query'] is None)
|
assert(result['query'] is None)
|
||||||
assert(result['url'] == 'http://hostname')
|
assert(result['url'] == 'http://hostname')
|
||||||
assert(result['qsd'] == {})
|
assert(result['qsd'] == {})
|
||||||
|
assert(result['qsd-'] == {})
|
||||||
|
assert(result['qsd+'] == {})
|
||||||
|
|
||||||
|
result = utils.parse_url('http://hostname/?-KeY=Value')
|
||||||
|
assert(result['schema'] == 'http')
|
||||||
|
assert(result['host'] == 'hostname')
|
||||||
|
assert(result['port'] is None)
|
||||||
|
assert(result['user'] is None)
|
||||||
|
assert(result['password'] is None)
|
||||||
|
assert(result['fullpath'] == '/')
|
||||||
|
assert(result['path'] == '/')
|
||||||
|
assert(result['query'] is None)
|
||||||
|
assert(result['url'] == 'http://hostname/')
|
||||||
|
assert('-key' in result['qsd'])
|
||||||
|
assert(unquote(result['qsd']['-key']) == 'Value')
|
||||||
|
assert('KeY' in result['qsd-'])
|
||||||
|
assert(unquote(result['qsd-']['KeY']) == 'Value')
|
||||||
|
assert(result['qsd+'] == {})
|
||||||
|
|
||||||
|
result = utils.parse_url('http://hostname/?+KeY=Value')
|
||||||
|
assert(result['schema'] == 'http')
|
||||||
|
assert(result['host'] == 'hostname')
|
||||||
|
assert(result['port'] is None)
|
||||||
|
assert(result['user'] is None)
|
||||||
|
assert(result['password'] is None)
|
||||||
|
assert(result['fullpath'] == '/')
|
||||||
|
assert(result['path'] == '/')
|
||||||
|
assert(result['query'] is None)
|
||||||
|
assert(result['url'] == 'http://hostname/')
|
||||||
|
assert('+key' in result['qsd'])
|
||||||
|
assert('KeY' in result['qsd+'])
|
||||||
|
assert(result['qsd+']['KeY'] == 'Value')
|
||||||
|
assert(result['qsd-'] == {})
|
||||||
|
|
||||||
|
result = utils.parse_url(
|
||||||
|
'http://hostname/?+KeY=ValueA&-kEy=ValueB&KEY=Value%20+C')
|
||||||
|
assert(result['schema'] == 'http')
|
||||||
|
assert(result['host'] == 'hostname')
|
||||||
|
assert(result['port'] is None)
|
||||||
|
assert(result['user'] is None)
|
||||||
|
assert(result['password'] is None)
|
||||||
|
assert(result['fullpath'] == '/')
|
||||||
|
assert(result['path'] == '/')
|
||||||
|
assert(result['query'] is None)
|
||||||
|
assert(result['url'] == 'http://hostname/')
|
||||||
|
assert('+key' in result['qsd'])
|
||||||
|
assert('-key' in result['qsd'])
|
||||||
|
assert('key' in result['qsd'])
|
||||||
|
assert('KeY' in result['qsd+'])
|
||||||
|
assert(result['qsd+']['KeY'] == 'ValueA')
|
||||||
|
assert('kEy' in result['qsd-'])
|
||||||
|
assert(result['qsd-']['kEy'] == 'ValueB')
|
||||||
|
assert(result['qsd']['key'] == 'Value C')
|
||||||
|
assert(result['qsd']['+key'] == result['qsd+']['KeY'])
|
||||||
|
assert(result['qsd']['-key'] == result['qsd-']['kEy'])
|
||||||
|
|
||||||
result = utils.parse_url('http://hostname////')
|
result = utils.parse_url('http://hostname////')
|
||||||
assert(result['schema'] == 'http')
|
assert(result['schema'] == 'http')
|
||||||
@ -85,6 +164,8 @@ def test_parse_url():
|
|||||||
assert(result['query'] is None)
|
assert(result['query'] is None)
|
||||||
assert(result['url'] == 'http://hostname/')
|
assert(result['url'] == 'http://hostname/')
|
||||||
assert(result['qsd'] == {})
|
assert(result['qsd'] == {})
|
||||||
|
assert(result['qsd-'] == {})
|
||||||
|
assert(result['qsd+'] == {})
|
||||||
|
|
||||||
result = utils.parse_url('http://hostname:40////')
|
result = utils.parse_url('http://hostname:40////')
|
||||||
assert(result['schema'] == 'http')
|
assert(result['schema'] == 'http')
|
||||||
@ -97,6 +178,8 @@ def test_parse_url():
|
|||||||
assert(result['query'] is None)
|
assert(result['query'] is None)
|
||||||
assert(result['url'] == 'http://hostname:40/')
|
assert(result['url'] == 'http://hostname:40/')
|
||||||
assert(result['qsd'] == {})
|
assert(result['qsd'] == {})
|
||||||
|
assert(result['qsd-'] == {})
|
||||||
|
assert(result['qsd+'] == {})
|
||||||
|
|
||||||
result = utils.parse_url('HTTP://HoStNaMe:40/test.php')
|
result = utils.parse_url('HTTP://HoStNaMe:40/test.php')
|
||||||
assert(result['schema'] == 'http')
|
assert(result['schema'] == 'http')
|
||||||
@ -109,6 +192,8 @@ def test_parse_url():
|
|||||||
assert(result['query'] == 'test.php')
|
assert(result['query'] == 'test.php')
|
||||||
assert(result['url'] == 'http://HoStNaMe:40/test.php')
|
assert(result['url'] == 'http://HoStNaMe:40/test.php')
|
||||||
assert(result['qsd'] == {})
|
assert(result['qsd'] == {})
|
||||||
|
assert(result['qsd-'] == {})
|
||||||
|
assert(result['qsd+'] == {})
|
||||||
|
|
||||||
result = utils.parse_url('HTTPS://user@hostname/test.py')
|
result = utils.parse_url('HTTPS://user@hostname/test.py')
|
||||||
assert(result['schema'] == 'https')
|
assert(result['schema'] == 'https')
|
||||||
@ -121,6 +206,8 @@ def test_parse_url():
|
|||||||
assert(result['query'] == 'test.py')
|
assert(result['query'] == 'test.py')
|
||||||
assert(result['url'] == 'https://user@hostname/test.py')
|
assert(result['url'] == 'https://user@hostname/test.py')
|
||||||
assert(result['qsd'] == {})
|
assert(result['qsd'] == {})
|
||||||
|
assert(result['qsd-'] == {})
|
||||||
|
assert(result['qsd+'] == {})
|
||||||
|
|
||||||
result = utils.parse_url(' HTTPS://///user@@@hostname///test.py ')
|
result = utils.parse_url(' HTTPS://///user@@@hostname///test.py ')
|
||||||
assert(result['schema'] == 'https')
|
assert(result['schema'] == 'https')
|
||||||
@ -133,6 +220,8 @@ def test_parse_url():
|
|||||||
assert(result['query'] == 'test.py')
|
assert(result['query'] == 'test.py')
|
||||||
assert(result['url'] == 'https://user@hostname/test.py')
|
assert(result['url'] == 'https://user@hostname/test.py')
|
||||||
assert(result['qsd'] == {})
|
assert(result['qsd'] == {})
|
||||||
|
assert(result['qsd-'] == {})
|
||||||
|
assert(result['qsd+'] == {})
|
||||||
|
|
||||||
result = utils.parse_url(
|
result = utils.parse_url(
|
||||||
'HTTPS://user:password@otherHost/full///path/name/',
|
'HTTPS://user:password@otherHost/full///path/name/',
|
||||||
@ -147,6 +236,8 @@ def test_parse_url():
|
|||||||
assert(result['query'] is None)
|
assert(result['query'] is None)
|
||||||
assert(result['url'] == 'https://user:password@otherHost/full/path/name/')
|
assert(result['url'] == 'https://user:password@otherHost/full/path/name/')
|
||||||
assert(result['qsd'] == {})
|
assert(result['qsd'] == {})
|
||||||
|
assert(result['qsd-'] == {})
|
||||||
|
assert(result['qsd+'] == {})
|
||||||
|
|
||||||
# Handle garbage
|
# Handle garbage
|
||||||
assert(utils.parse_url(None) is None)
|
assert(utils.parse_url(None) is None)
|
||||||
@ -173,6 +264,8 @@ def test_parse_url():
|
|||||||
assert(unquote(result['qsd']['from']) == 'test@test.com')
|
assert(unquote(result['qsd']['from']) == 'test@test.com')
|
||||||
assert('format' in result['qsd'])
|
assert('format' in result['qsd'])
|
||||||
assert(unquote(result['qsd']['format']) == 'text')
|
assert(unquote(result['qsd']['format']) == 'text')
|
||||||
|
assert(result['qsd-'] == {})
|
||||||
|
assert(result['qsd+'] == {})
|
||||||
|
|
||||||
# Test Passwords with question marks ?; not supported
|
# Test Passwords with question marks ?; not supported
|
||||||
result = utils.parse_url(
|
result = utils.parse_url(
|
||||||
@ -194,6 +287,8 @@ def test_parse_url():
|
|||||||
assert(result['query'] is None)
|
assert(result['query'] is None)
|
||||||
assert(result['url'] == 'http://nuxref.com')
|
assert(result['url'] == 'http://nuxref.com')
|
||||||
assert(result['qsd'] == {})
|
assert(result['qsd'] == {})
|
||||||
|
assert(result['qsd-'] == {})
|
||||||
|
assert(result['qsd+'] == {})
|
||||||
|
|
||||||
# just host and path
|
# just host and path
|
||||||
result = utils.parse_url(
|
result = utils.parse_url(
|
||||||
@ -209,6 +304,8 @@ def test_parse_url():
|
|||||||
assert(result['query'] == 'host')
|
assert(result['query'] == 'host')
|
||||||
assert(result['url'] == 'http://invalid/host')
|
assert(result['url'] == 'http://invalid/host')
|
||||||
assert(result['qsd'] == {})
|
assert(result['qsd'] == {})
|
||||||
|
assert(result['qsd-'] == {})
|
||||||
|
assert(result['qsd+'] == {})
|
||||||
|
|
||||||
# just all out invalid
|
# just all out invalid
|
||||||
assert(utils.parse_url('?') is None)
|
assert(utils.parse_url('?') is None)
|
||||||
@ -227,6 +324,8 @@ def test_parse_url():
|
|||||||
assert(result['query'] is None)
|
assert(result['query'] is None)
|
||||||
assert(result['url'] == 'http://nuxref.com')
|
assert(result['url'] == 'http://nuxref.com')
|
||||||
assert(result['qsd'] == {})
|
assert(result['qsd'] == {})
|
||||||
|
assert(result['qsd-'] == {})
|
||||||
|
assert(result['qsd+'] == {})
|
||||||
|
|
||||||
|
|
||||||
def test_parse_bool():
|
def test_parse_bool():
|
||||||
|
Loading…
Reference in New Issue
Block a user