improved URL parsing (#544)

This commit is contained in:
Chris Caron 2022-03-14 08:28:25 -04:00 committed by GitHub
parent 908f2c1e27
commit 259462efcc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 310 additions and 35 deletions

View File

@ -43,6 +43,7 @@ from ..common import NotifyFormat
from ..utils import parse_bool
from ..utils import parse_list
from ..utils import apply_template
from ..utils import is_hostname
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
@ -287,6 +288,22 @@ class NotifyMatrix(NotifyBase):
self.logger.warning(msg)
raise TypeError(msg)
elif not is_hostname(self.host):
msg = 'An invalid Matrix Hostname ({}) was specified'\
.format(self.host)
self.logger.warning(msg)
raise TypeError(msg)
else:
# Verify port if specified
if self.port is not None and not (
isinstance(self.port, int)
and self.port >= self.template_tokens['port']['min']
and self.port <= self.template_tokens['port']['max']):
msg = 'An invalid Matrix Port ({}) was specified'\
.format(self.port)
self.logger.warning(msg)
raise TypeError(msg)
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Matrix Notification

View File

@ -565,7 +565,7 @@ def parse_qsd(qs):
return result
def parse_url(url, default_schema='http', verify_host=True):
def parse_url(url, default_schema='http', verify_host=True, strict_port=False):
"""A function that greatly simplifies the parsing of a url
specified by the end user.
@ -697,13 +697,29 @@ def parse_url(url, default_schema='http', verify_host=True):
# and it's already assigned
pass
# Max port is 65535 so (1,5 digits)
match = re.search(
r'^(?P<host>.+):(?P<port>[1-9][0-9]{0,4})$', result['host'])
if match:
# Port Parsing
pmatch = re.search(
r'^(?P<host>([[0-9a-f:]+]|[^:]+)):(?P<port>[^:]*)$',
result['host'])
if pmatch:
# Separate our port from our hostname (if port is detected)
result['host'] = match.group('host')
result['port'] = int(match.group('port'))
result['host'] = pmatch.group('host')
try:
# If we're dealing with an integer, go ahead and convert it
# otherwise return an 'x' which will raise a ValueError
#
# This small extra check allows us to treat floats/doubles
# as strings. Hence a value like '4.2' won't be converted to a 4
# (and the .2 lost)
result['port'] = int(
pmatch.group('port')
if re.search(r'[0-9]', pmatch.group('port')) else 'x')
except ValueError:
if verify_host:
# Invalid Host Specified
return None
if verify_host:
# Verify and Validate our hostname
@ -713,6 +729,26 @@ def parse_url(url, default_schema='http', verify_host=True):
# some indication as to what went wrong
return None
# Max port is 65535 and min is 1
if isinstance(result['port'], int) and not ((
not strict_port or (
strict_port and
result['port'] > 0 and result['port'] <= 65535))):
# An invalid port was specified
return None
elif pmatch and not isinstance(result['port'], int):
if strict_port:
# Store port
result['port'] = pmatch.group('port').strip()
else:
# Fall back
result['port'] = None
result['host'] = '{}:{}'.format(
pmatch.group('host'), pmatch.group('port'))
# Re-assemble cleaned up version of the url
result['url'] = '%s://' % result['schema']
if isinstance(result['user'], six.string_types):
@ -725,8 +761,12 @@ def parse_url(url, default_schema='http', verify_host=True):
result['url'] += '@'
result['url'] += result['host']
if result['port']:
result['url'] += ':%d' % result['port']
if result['port'] is not None:
try:
result['url'] += ':%d' % result['port']
except TypeError:
result['url'] += ':%s' % result['port']
if result['fullpath']:
result['url'] += result['fullpath']

View File

@ -119,6 +119,20 @@ apprise_url_tests = (
# user and token specified; image set to True
'instance': plugins.NotifyMatrix,
}),
# A Bunch of bad ports
('matrixs://user:pass@hostname:port/#room_alias', {
# Invalid Port specified (was a string)
'instance': TypeError,
}),
('matrixs://user:pass@hostname:0/#room_alias', {
# Invalid Port specified (was a string)
'instance': TypeError,
}),
('matrixs://user:pass@hostname:65536/#room_alias', {
# Invalid Port specified (was a string)
'instance': TypeError,
}),
# More general testing...
('matrixs://user@{}?mode=t2bot&format=markdown&image=True'
.format('a' * 64), {
# user and token specified; image set to True
@ -206,25 +220,26 @@ def test_plugin_matrix_general(mock_post, mock_get):
mock_post.return_value = request
# Variation Initializations
obj = plugins.NotifyMatrix(targets='#abcd')
obj = plugins.NotifyMatrix(host='host', targets='#abcd')
assert isinstance(obj, plugins.NotifyMatrix) is True
assert isinstance(obj.url(), six.string_types) is True
# Registration successful
assert obj.send(body="test") is True
obj = plugins.NotifyMatrix(user='user', targets='#abcd')
obj = plugins.NotifyMatrix(host='host', user='user', targets='#abcd')
assert isinstance(obj, plugins.NotifyMatrix) is True
assert isinstance(obj.url(), six.string_types) is True
# Registration successful
assert obj.send(body="test") is True
obj = plugins.NotifyMatrix(password='passwd', targets='#abcd')
obj = plugins.NotifyMatrix(host='host', password='passwd', targets='#abcd')
assert isinstance(obj, plugins.NotifyMatrix) is True
assert isinstance(obj.url(), six.string_types) is True
# A username gets automatically generated in these cases
assert obj.send(body="test") is True
obj = plugins.NotifyMatrix(user='user', password='passwd', targets='#abcd')
obj = plugins.NotifyMatrix(
host='host', user='user', password='passwd', targets='#abcd')
assert isinstance(obj.url(), six.string_types) is True
assert isinstance(obj, plugins.NotifyMatrix) is True
# Registration Successful
@ -279,17 +294,18 @@ def test_plugin_matrix_general(mock_post, mock_get):
# Fails because we couldn't register because of 404 errors
assert obj.send(body="test") is False
obj = plugins.NotifyMatrix(user='test', targets='#abcd')
obj = plugins.NotifyMatrix(host='host', user='test', targets='#abcd')
assert isinstance(obj, plugins.NotifyMatrix) is True
# Fails because we still couldn't register
assert obj.send(user='test', password='passwd', body="test") is False
obj = plugins.NotifyMatrix(user='test', password='passwd', targets='#abcd')
obj = plugins.NotifyMatrix(
host='host', user='test', password='passwd', targets='#abcd')
assert isinstance(obj, plugins.NotifyMatrix) is True
# Fails because we still couldn't register
assert obj.send(body="test") is False
obj = plugins.NotifyMatrix(password='passwd', targets='#abcd')
obj = plugins.NotifyMatrix(host='host', password='passwd', targets='#abcd')
# Fails because we still couldn't register
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj.send(body="test") is False
@ -317,7 +333,7 @@ def test_plugin_matrix_general(mock_post, mock_get):
request.content = dumps(response_obj)
request.status_code = requests.codes.ok
obj = plugins.NotifyMatrix(targets=None)
obj = plugins.NotifyMatrix(host='host', targets=None)
assert isinstance(obj, plugins.NotifyMatrix) is True
# Force a empty joined list response
@ -377,14 +393,15 @@ def test_plugin_matrix_fetch(mock_post, mock_get):
mock_post.side_effect = fetch_failed
obj = plugins.NotifyMatrix(
user='user', password='passwd', include_image=True)
host='host', user='user', password='passwd', include_image=True)
assert isinstance(obj, plugins.NotifyMatrix) is True
# We would hve failed to send our image notification
assert obj.send(user='test', password='passwd', body="test") is False
# Do the same query with no images to fetch
asset = AppriseAsset(image_path_mask=False, image_url_mask=False)
obj = plugins.NotifyMatrix(user='user', password='passwd', asset=asset)
obj = plugins.NotifyMatrix(
host='host', user='user', password='passwd', asset=asset)
assert isinstance(obj, plugins.NotifyMatrix) is True
# We would hve failed to send our notification
assert obj.send(user='test', password='passwd', body="test") is False
@ -412,7 +429,7 @@ def test_plugin_matrix_fetch(mock_post, mock_get):
mock_post.return_value = request
mock_get.return_value = request
obj = plugins.NotifyMatrix(include_image=True)
obj = plugins.NotifyMatrix(host='host', include_image=True)
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj.access_token is None
assert obj._register() is True
@ -465,7 +482,7 @@ def test_plugin_matrix_auth(mock_post, mock_get):
mock_post.return_value = request
mock_get.return_value = request
obj = plugins.NotifyMatrix()
obj = plugins.NotifyMatrix(host='localhost')
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj.access_token is None
# logging out without an access_token is silently a success
@ -502,7 +519,7 @@ def test_plugin_matrix_auth(mock_post, mock_get):
assert obj.access_token is None
# So will login
obj = plugins.NotifyMatrix(user='user', password='password')
obj = plugins.NotifyMatrix(host='host', user='user', password='password')
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj._login() is False
assert obj.access_token is None
@ -567,7 +584,7 @@ def test_plugin_matrix_rooms(mock_post, mock_get):
mock_post.return_value = request
mock_get.return_value = request
obj = plugins.NotifyMatrix()
obj = plugins.NotifyMatrix(host='host')
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj.access_token is None
@ -626,7 +643,7 @@ def test_plugin_matrix_rooms(mock_post, mock_get):
# Room creation
request.status_code = requests.codes.ok
obj = plugins.NotifyMatrix()
obj = plugins.NotifyMatrix(host='host')
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj.access_token is None
@ -671,7 +688,7 @@ def test_plugin_matrix_rooms(mock_post, mock_get):
# Room detection
request.status_code = requests.codes.ok
request.content = dumps(response_obj)
obj = plugins.NotifyMatrix()
obj = plugins.NotifyMatrix(host='localhost')
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj.access_token is None
@ -697,7 +714,7 @@ def test_plugin_matrix_rooms(mock_post, mock_get):
# Room id lookup
request.status_code = requests.codes.ok
obj = plugins.NotifyMatrix()
obj = plugins.NotifyMatrix(host='localhost')
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj.access_token is None
@ -780,7 +797,7 @@ def test_plugin_matrix_image_errors(mock_post, mock_get):
mock_get.side_effect = mock_function_handing
mock_post.side_effect = mock_function_handing
obj = plugins.NotifyMatrix(include_image=True)
obj = plugins.NotifyMatrix(host='host', include_image=True)
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj.access_token is None
@ -788,7 +805,7 @@ def test_plugin_matrix_image_errors(mock_post, mock_get):
# we had post errors (of any kind) we still report a failure.
assert obj.notify('test', 'test') is False
obj = plugins.NotifyMatrix(include_image=False)
obj = plugins.NotifyMatrix(host='host', include_image=False)
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj.access_token is None
@ -817,13 +834,13 @@ def test_plugin_matrix_image_errors(mock_post, mock_get):
# Prepare Mock
mock_get.side_effect = mock_function_handing
mock_post.side_effect = mock_function_handing
obj = plugins.NotifyMatrix(include_image=True)
obj = plugins.NotifyMatrix(host='host', include_image=True)
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj.access_token is None
assert obj.notify('test', 'test') is True
obj = plugins.NotifyMatrix(include_image=False)
obj = plugins.NotifyMatrix(host='host', include_image=False)
assert isinstance(obj, plugins.NotifyMatrix) is True
assert obj.access_token is None

View File

@ -49,6 +49,14 @@ apprise_url_tests = (
('kodi://[2001:db8:002a:3256:adfe:05c0:0003:0006]', {
# Support IPv6 Addresses
'instance': plugins.NotifyXBMC,
# Privacy URL
'privacy_url': 'kodi://[2001:db8:002a:3256:adfe:05c0:0003:0006]',
}),
('kodi://[2001:db8:002a:3256:adfe:05c0:0003:0006]:8282', {
# Support IPv6 Addresses with port
'instance': plugins.NotifyXBMC,
# Privacy URL
'privacy_url': 'kodi://[2001:db8:002a:3256:adfe:05c0:0003:0006]:8282',
}),
('kodi://user:pass@localhost', {
'instance': plugins.NotifyXBMC,

View File

@ -100,6 +100,23 @@ def test_parse_url():
# colon after hostname without port number is no good
assert utils.parse_url('http://hostname:') is None
# An invalid port
result = utils.parse_url(
'http://hostname:invalid', verify_host=False, strict_port=True)
assert result['schema'] == 'http'
assert result['host'] == 'hostname'
assert result['port'] == 'invalid'
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None
assert result['path'] is None
assert result['query'] is None
assert result['url'] == 'http://hostname:invalid'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
# However if we don't verify the host, it is okay
result = utils.parse_url('http://hostname:', verify_host=False)
assert result['schema'] == 'http'
@ -116,14 +133,190 @@ def test_parse_url():
assert result['qsd+'] == {}
assert result['qsd:'] == {}
# A port of Zero is not valid
assert utils.parse_url('http://hostname:0') is None
# A port of Zero is not valid with strict port checking
assert utils.parse_url('http://hostname:0', strict_port=True) is None
# Port set to zero; port is not stored
# Without strict port checking however, it is okay
result = utils.parse_url('http://hostname:0', strict_port=False)
assert result['schema'] == 'http'
assert result['host'] == 'hostname'
assert result['port'] == 0
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None
assert result['path'] is None
assert result['query'] is None
assert result['url'] == 'http://hostname:0'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
# A negative port is not valid
assert utils.parse_url('http://hostname:-92', strict_port=True) is None
result = utils.parse_url(
'http://hostname:-92', verify_host=False, strict_port=True)
assert result['schema'] == 'http'
assert result['host'] == 'hostname'
assert result['port'] == -92
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None
assert result['path'] is None
assert result['query'] is None
assert result['url'] == 'http://hostname:-92'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
# A port that is too large is not valid
assert utils.parse_url('http://hostname:65536', strict_port=True) is None
# This is an accetable port (the maximum)
result = utils.parse_url('http://hostname:65535', strict_port=True)
assert result['schema'] == 'http'
assert result['host'] == 'hostname'
assert result['port'] == 65535
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None
assert result['path'] is None
assert result['query'] is None
assert result['url'] == 'http://hostname:65535'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
# This is an accetable port (the maximum)
result = utils.parse_url('http://hostname:1', strict_port=True)
assert result['schema'] == 'http'
assert result['host'] == 'hostname'
assert result['port'] == 1
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None
assert result['path'] is None
assert result['query'] is None
assert result['url'] == 'http://hostname:1'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
# A port that was identfied as a string is invalid
assert utils.parse_url('http://hostname:invalid', strict_port=True) is None
result = utils.parse_url(
'http://hostname:invalid', verify_host=False, strict_port=True)
assert result['schema'] == 'http'
assert result['host'] == 'hostname'
assert result['port'] == 'invalid'
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None
assert result['path'] is None
assert result['query'] is None
assert result['url'] == 'http://hostname:invalid'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
result = utils.parse_url(
'http://hostname:invalid', verify_host=False, strict_port=False)
assert result['schema'] == 'http'
assert result['host'] == 'hostname:invalid'
assert result['port'] is None
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None
assert result['path'] is None
assert result['query'] is None
assert result['url'] == 'http://hostname:invalid'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
result = utils.parse_url(
'http://hostname:invalid?key=value&-minuskey=mvalue',
verify_host=False, strict_port=False)
assert result['schema'] == 'http'
assert result['host'] == 'hostname:invalid'
assert result['port'] is None
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None
assert result['path'] is None
assert result['query'] is None
assert result['url'] == 'http://hostname:invalid'
assert unquote(result['qsd']['-minuskey']) == 'mvalue'
assert unquote(result['qsd']['key']) == 'value'
assert unquote(result['qsd-']['minuskey']) == 'mvalue'
assert result['qsd+'] == {}
assert result['qsd:'] == {}
# Handling of floats
assert utils.parse_url('http://hostname:4.2', strict_port=True) is None
result = utils.parse_url(
'http://hostname:4.2', verify_host=False, strict_port=True)
assert result['schema'] == 'http'
assert result['host'] == 'hostname'
assert result['port'] == '4.2'
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None
assert result['path'] is None
assert result['query'] is None
assert result['url'] == 'http://hostname:4.2'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
# A Port of zero is not acceptable for a regular hostname
assert utils.parse_url('http://hostname:0', strict_port=True) is None
# No host verification (zero is an acceptable port when this is the case
result = utils.parse_url('http://hostname:0', verify_host=False)
assert result['schema'] == 'http'
assert result['host'] == 'hostname:0'
assert result['port'] is None
assert result['host'] == 'hostname'
assert result['port'] == 0
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None
assert result['path'] is None
assert result['query'] is None
assert result['url'] == 'http://hostname:0'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
result = utils.parse_url(
'http://[2001:db8:002a:3256:adfe:05c0:0003:0006]:8080',
verify_host=False)
assert result['schema'] == 'http'
assert result['host'] == '[2001:db8:002a:3256:adfe:05c0:0003:0006]'
assert result['port'] == 8080
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None
assert result['path'] is None
assert result['query'] is None
assert result['url'] == \
'http://[2001:db8:002a:3256:adfe:05c0:0003:0006]:8080'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
assert result['qsd:'] == {}
result = utils.parse_url(
'http://hostname:0', verify_host=False, strict_port=True)
assert result['schema'] == 'http'
assert result['host'] == 'hostname'
assert result['port'] == 0
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None