From 259462efcc4c2d87f317061972b90617acfbc666 Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Mon, 14 Mar 2022 08:28:25 -0400 Subject: [PATCH] improved URL parsing (#544) --- apprise/plugins/NotifyMatrix.py | 17 +++ apprise/utils.py | 58 +++++++-- test/test_plugin_matrix.py | 59 ++++++---- test/test_plugin_xbmc_kodi.py | 8 ++ test/test_utils.py | 203 +++++++++++++++++++++++++++++++- 5 files changed, 310 insertions(+), 35 deletions(-) diff --git a/apprise/plugins/NotifyMatrix.py b/apprise/plugins/NotifyMatrix.py index b2103d99..8b7c6eb4 100644 --- a/apprise/plugins/NotifyMatrix.py +++ b/apprise/plugins/NotifyMatrix.py @@ -43,6 +43,7 @@ from ..common import NotifyFormat from ..utils import parse_bool from ..utils import parse_list from ..utils import apply_template +from ..utils import is_hostname from ..utils import validate_regex from ..AppriseLocale import gettext_lazy as _ @@ -287,6 +288,22 @@ class NotifyMatrix(NotifyBase): self.logger.warning(msg) raise TypeError(msg) + elif not is_hostname(self.host): + msg = 'An invalid Matrix Hostname ({}) was specified'\ + .format(self.host) + self.logger.warning(msg) + raise TypeError(msg) + else: + # Verify port if specified + if self.port is not None and not ( + isinstance(self.port, int) + and self.port >= self.template_tokens['port']['min'] + and self.port <= self.template_tokens['port']['max']): + msg = 'An invalid Matrix Port ({}) was specified'\ + .format(self.port) + self.logger.warning(msg) + raise TypeError(msg) + def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs): """ Perform Matrix Notification diff --git a/apprise/utils.py b/apprise/utils.py index 0c525d03..cf4ecdda 100644 --- a/apprise/utils.py +++ b/apprise/utils.py @@ -565,7 +565,7 @@ def parse_qsd(qs): return result -def parse_url(url, default_schema='http', verify_host=True): +def parse_url(url, default_schema='http', verify_host=True, strict_port=False): """A function that greatly simplifies the parsing of a url specified by the end user. @@ -697,13 +697,29 @@ def parse_url(url, default_schema='http', verify_host=True): # and it's already assigned pass - # Max port is 65535 so (1,5 digits) - match = re.search( - r'^(?P.+):(?P[1-9][0-9]{0,4})$', result['host']) - if match: + # Port Parsing + pmatch = re.search( + r'^(?P([[0-9a-f:]+]|[^:]+)):(?P[^:]*)$', + result['host']) + + if pmatch: # Separate our port from our hostname (if port is detected) - result['host'] = match.group('host') - result['port'] = int(match.group('port')) + result['host'] = pmatch.group('host') + try: + # If we're dealing with an integer, go ahead and convert it + # otherwise return an 'x' which will raise a ValueError + # + # This small extra check allows us to treat floats/doubles + # as strings. Hence a value like '4.2' won't be converted to a 4 + # (and the .2 lost) + result['port'] = int( + pmatch.group('port') + if re.search(r'[0-9]', pmatch.group('port')) else 'x') + + except ValueError: + if verify_host: + # Invalid Host Specified + return None if verify_host: # Verify and Validate our hostname @@ -713,6 +729,26 @@ def parse_url(url, default_schema='http', verify_host=True): # some indication as to what went wrong return None + # Max port is 65535 and min is 1 + if isinstance(result['port'], int) and not (( + not strict_port or ( + strict_port and + result['port'] > 0 and result['port'] <= 65535))): + + # An invalid port was specified + return None + + elif pmatch and not isinstance(result['port'], int): + if strict_port: + # Store port + result['port'] = pmatch.group('port').strip() + + else: + # Fall back + result['port'] = None + result['host'] = '{}:{}'.format( + pmatch.group('host'), pmatch.group('port')) + # Re-assemble cleaned up version of the url result['url'] = '%s://' % result['schema'] if isinstance(result['user'], six.string_types): @@ -725,8 +761,12 @@ def parse_url(url, default_schema='http', verify_host=True): result['url'] += '@' result['url'] += result['host'] - if result['port']: - result['url'] += ':%d' % result['port'] + if result['port'] is not None: + try: + result['url'] += ':%d' % result['port'] + + except TypeError: + result['url'] += ':%s' % result['port'] if result['fullpath']: result['url'] += result['fullpath'] diff --git a/test/test_plugin_matrix.py b/test/test_plugin_matrix.py index 7a3101d9..01f8b426 100644 --- a/test/test_plugin_matrix.py +++ b/test/test_plugin_matrix.py @@ -119,6 +119,20 @@ apprise_url_tests = ( # user and token specified; image set to True 'instance': plugins.NotifyMatrix, }), + # A Bunch of bad ports + ('matrixs://user:pass@hostname:port/#room_alias', { + # Invalid Port specified (was a string) + 'instance': TypeError, + }), + ('matrixs://user:pass@hostname:0/#room_alias', { + # Invalid Port specified (was a string) + 'instance': TypeError, + }), + ('matrixs://user:pass@hostname:65536/#room_alias', { + # Invalid Port specified (was a string) + 'instance': TypeError, + }), + # More general testing... ('matrixs://user@{}?mode=t2bot&format=markdown&image=True' .format('a' * 64), { # user and token specified; image set to True @@ -206,25 +220,26 @@ def test_plugin_matrix_general(mock_post, mock_get): mock_post.return_value = request # Variation Initializations - obj = plugins.NotifyMatrix(targets='#abcd') + obj = plugins.NotifyMatrix(host='host', targets='#abcd') assert isinstance(obj, plugins.NotifyMatrix) is True assert isinstance(obj.url(), six.string_types) is True # Registration successful assert obj.send(body="test") is True - obj = plugins.NotifyMatrix(user='user', targets='#abcd') + obj = plugins.NotifyMatrix(host='host', user='user', targets='#abcd') assert isinstance(obj, plugins.NotifyMatrix) is True assert isinstance(obj.url(), six.string_types) is True # Registration successful assert obj.send(body="test") is True - obj = plugins.NotifyMatrix(password='passwd', targets='#abcd') + obj = plugins.NotifyMatrix(host='host', password='passwd', targets='#abcd') assert isinstance(obj, plugins.NotifyMatrix) is True assert isinstance(obj.url(), six.string_types) is True # A username gets automatically generated in these cases assert obj.send(body="test") is True - obj = plugins.NotifyMatrix(user='user', password='passwd', targets='#abcd') + obj = plugins.NotifyMatrix( + host='host', user='user', password='passwd', targets='#abcd') assert isinstance(obj.url(), six.string_types) is True assert isinstance(obj, plugins.NotifyMatrix) is True # Registration Successful @@ -279,17 +294,18 @@ def test_plugin_matrix_general(mock_post, mock_get): # Fails because we couldn't register because of 404 errors assert obj.send(body="test") is False - obj = plugins.NotifyMatrix(user='test', targets='#abcd') + obj = plugins.NotifyMatrix(host='host', user='test', targets='#abcd') assert isinstance(obj, plugins.NotifyMatrix) is True # Fails because we still couldn't register assert obj.send(user='test', password='passwd', body="test") is False - obj = plugins.NotifyMatrix(user='test', password='passwd', targets='#abcd') + obj = plugins.NotifyMatrix( + host='host', user='test', password='passwd', targets='#abcd') assert isinstance(obj, plugins.NotifyMatrix) is True # Fails because we still couldn't register assert obj.send(body="test") is False - obj = plugins.NotifyMatrix(password='passwd', targets='#abcd') + obj = plugins.NotifyMatrix(host='host', password='passwd', targets='#abcd') # Fails because we still couldn't register assert isinstance(obj, plugins.NotifyMatrix) is True assert obj.send(body="test") is False @@ -317,7 +333,7 @@ def test_plugin_matrix_general(mock_post, mock_get): request.content = dumps(response_obj) request.status_code = requests.codes.ok - obj = plugins.NotifyMatrix(targets=None) + obj = plugins.NotifyMatrix(host='host', targets=None) assert isinstance(obj, plugins.NotifyMatrix) is True # Force a empty joined list response @@ -377,14 +393,15 @@ def test_plugin_matrix_fetch(mock_post, mock_get): mock_post.side_effect = fetch_failed obj = plugins.NotifyMatrix( - user='user', password='passwd', include_image=True) + host='host', user='user', password='passwd', include_image=True) assert isinstance(obj, plugins.NotifyMatrix) is True # We would hve failed to send our image notification assert obj.send(user='test', password='passwd', body="test") is False # Do the same query with no images to fetch asset = AppriseAsset(image_path_mask=False, image_url_mask=False) - obj = plugins.NotifyMatrix(user='user', password='passwd', asset=asset) + obj = plugins.NotifyMatrix( + host='host', user='user', password='passwd', asset=asset) assert isinstance(obj, plugins.NotifyMatrix) is True # We would hve failed to send our notification assert obj.send(user='test', password='passwd', body="test") is False @@ -412,7 +429,7 @@ def test_plugin_matrix_fetch(mock_post, mock_get): mock_post.return_value = request mock_get.return_value = request - obj = plugins.NotifyMatrix(include_image=True) + obj = plugins.NotifyMatrix(host='host', include_image=True) assert isinstance(obj, plugins.NotifyMatrix) is True assert obj.access_token is None assert obj._register() is True @@ -465,7 +482,7 @@ def test_plugin_matrix_auth(mock_post, mock_get): mock_post.return_value = request mock_get.return_value = request - obj = plugins.NotifyMatrix() + obj = plugins.NotifyMatrix(host='localhost') assert isinstance(obj, plugins.NotifyMatrix) is True assert obj.access_token is None # logging out without an access_token is silently a success @@ -502,7 +519,7 @@ def test_plugin_matrix_auth(mock_post, mock_get): assert obj.access_token is None # So will login - obj = plugins.NotifyMatrix(user='user', password='password') + obj = plugins.NotifyMatrix(host='host', user='user', password='password') assert isinstance(obj, plugins.NotifyMatrix) is True assert obj._login() is False assert obj.access_token is None @@ -567,7 +584,7 @@ def test_plugin_matrix_rooms(mock_post, mock_get): mock_post.return_value = request mock_get.return_value = request - obj = plugins.NotifyMatrix() + obj = plugins.NotifyMatrix(host='host') assert isinstance(obj, plugins.NotifyMatrix) is True assert obj.access_token is None @@ -626,7 +643,7 @@ def test_plugin_matrix_rooms(mock_post, mock_get): # Room creation request.status_code = requests.codes.ok - obj = plugins.NotifyMatrix() + obj = plugins.NotifyMatrix(host='host') assert isinstance(obj, plugins.NotifyMatrix) is True assert obj.access_token is None @@ -671,7 +688,7 @@ def test_plugin_matrix_rooms(mock_post, mock_get): # Room detection request.status_code = requests.codes.ok request.content = dumps(response_obj) - obj = plugins.NotifyMatrix() + obj = plugins.NotifyMatrix(host='localhost') assert isinstance(obj, plugins.NotifyMatrix) is True assert obj.access_token is None @@ -697,7 +714,7 @@ def test_plugin_matrix_rooms(mock_post, mock_get): # Room id lookup request.status_code = requests.codes.ok - obj = plugins.NotifyMatrix() + obj = plugins.NotifyMatrix(host='localhost') assert isinstance(obj, plugins.NotifyMatrix) is True assert obj.access_token is None @@ -780,7 +797,7 @@ def test_plugin_matrix_image_errors(mock_post, mock_get): mock_get.side_effect = mock_function_handing mock_post.side_effect = mock_function_handing - obj = plugins.NotifyMatrix(include_image=True) + obj = plugins.NotifyMatrix(host='host', include_image=True) assert isinstance(obj, plugins.NotifyMatrix) is True assert obj.access_token is None @@ -788,7 +805,7 @@ def test_plugin_matrix_image_errors(mock_post, mock_get): # we had post errors (of any kind) we still report a failure. assert obj.notify('test', 'test') is False - obj = plugins.NotifyMatrix(include_image=False) + obj = plugins.NotifyMatrix(host='host', include_image=False) assert isinstance(obj, plugins.NotifyMatrix) is True assert obj.access_token is None @@ -817,13 +834,13 @@ def test_plugin_matrix_image_errors(mock_post, mock_get): # Prepare Mock mock_get.side_effect = mock_function_handing mock_post.side_effect = mock_function_handing - obj = plugins.NotifyMatrix(include_image=True) + obj = plugins.NotifyMatrix(host='host', include_image=True) assert isinstance(obj, plugins.NotifyMatrix) is True assert obj.access_token is None assert obj.notify('test', 'test') is True - obj = plugins.NotifyMatrix(include_image=False) + obj = plugins.NotifyMatrix(host='host', include_image=False) assert isinstance(obj, plugins.NotifyMatrix) is True assert obj.access_token is None diff --git a/test/test_plugin_xbmc_kodi.py b/test/test_plugin_xbmc_kodi.py index e0231a7e..36abda74 100644 --- a/test/test_plugin_xbmc_kodi.py +++ b/test/test_plugin_xbmc_kodi.py @@ -49,6 +49,14 @@ apprise_url_tests = ( ('kodi://[2001:db8:002a:3256:adfe:05c0:0003:0006]', { # Support IPv6 Addresses 'instance': plugins.NotifyXBMC, + # Privacy URL + 'privacy_url': 'kodi://[2001:db8:002a:3256:adfe:05c0:0003:0006]', + }), + ('kodi://[2001:db8:002a:3256:adfe:05c0:0003:0006]:8282', { + # Support IPv6 Addresses with port + 'instance': plugins.NotifyXBMC, + # Privacy URL + 'privacy_url': 'kodi://[2001:db8:002a:3256:adfe:05c0:0003:0006]:8282', }), ('kodi://user:pass@localhost', { 'instance': plugins.NotifyXBMC, diff --git a/test/test_utils.py b/test/test_utils.py index 5d0383e1..502441ab 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -100,6 +100,23 @@ def test_parse_url(): # colon after hostname without port number is no good assert utils.parse_url('http://hostname:') is None + # An invalid port + result = utils.parse_url( + 'http://hostname:invalid', verify_host=False, strict_port=True) + assert result['schema'] == 'http' + assert result['host'] == 'hostname' + assert result['port'] == 'invalid' + assert result['user'] is None + assert result['password'] is None + assert result['fullpath'] is None + assert result['path'] is None + assert result['query'] is None + assert result['url'] == 'http://hostname:invalid' + assert result['qsd'] == {} + assert result['qsd-'] == {} + assert result['qsd+'] == {} + assert result['qsd:'] == {} + # However if we don't verify the host, it is okay result = utils.parse_url('http://hostname:', verify_host=False) assert result['schema'] == 'http' @@ -116,14 +133,190 @@ def test_parse_url(): assert result['qsd+'] == {} assert result['qsd:'] == {} - # A port of Zero is not valid - assert utils.parse_url('http://hostname:0') is None + # A port of Zero is not valid with strict port checking + assert utils.parse_url('http://hostname:0', strict_port=True) is None - # Port set to zero; port is not stored + # Without strict port checking however, it is okay + result = utils.parse_url('http://hostname:0', strict_port=False) + assert result['schema'] == 'http' + assert result['host'] == 'hostname' + assert result['port'] == 0 + assert result['user'] is None + assert result['password'] is None + assert result['fullpath'] is None + assert result['path'] is None + assert result['query'] is None + assert result['url'] == 'http://hostname:0' + assert result['qsd'] == {} + assert result['qsd-'] == {} + assert result['qsd+'] == {} + assert result['qsd:'] == {} + + # A negative port is not valid + assert utils.parse_url('http://hostname:-92', strict_port=True) is None + result = utils.parse_url( + 'http://hostname:-92', verify_host=False, strict_port=True) + assert result['schema'] == 'http' + assert result['host'] == 'hostname' + assert result['port'] == -92 + assert result['user'] is None + assert result['password'] is None + assert result['fullpath'] is None + assert result['path'] is None + assert result['query'] is None + assert result['url'] == 'http://hostname:-92' + assert result['qsd'] == {} + assert result['qsd-'] == {} + assert result['qsd+'] == {} + assert result['qsd:'] == {} + + # A port that is too large is not valid + assert utils.parse_url('http://hostname:65536', strict_port=True) is None + + # This is an accetable port (the maximum) + result = utils.parse_url('http://hostname:65535', strict_port=True) + assert result['schema'] == 'http' + assert result['host'] == 'hostname' + assert result['port'] == 65535 + assert result['user'] is None + assert result['password'] is None + assert result['fullpath'] is None + assert result['path'] is None + assert result['query'] is None + assert result['url'] == 'http://hostname:65535' + assert result['qsd'] == {} + assert result['qsd-'] == {} + assert result['qsd+'] == {} + assert result['qsd:'] == {} + + # This is an accetable port (the maximum) + result = utils.parse_url('http://hostname:1', strict_port=True) + assert result['schema'] == 'http' + assert result['host'] == 'hostname' + assert result['port'] == 1 + assert result['user'] is None + assert result['password'] is None + assert result['fullpath'] is None + assert result['path'] is None + assert result['query'] is None + assert result['url'] == 'http://hostname:1' + assert result['qsd'] == {} + assert result['qsd-'] == {} + assert result['qsd+'] == {} + assert result['qsd:'] == {} + + # A port that was identfied as a string is invalid + assert utils.parse_url('http://hostname:invalid', strict_port=True) is None + result = utils.parse_url( + 'http://hostname:invalid', verify_host=False, strict_port=True) + assert result['schema'] == 'http' + assert result['host'] == 'hostname' + assert result['port'] == 'invalid' + assert result['user'] is None + assert result['password'] is None + assert result['fullpath'] is None + assert result['path'] is None + assert result['query'] is None + assert result['url'] == 'http://hostname:invalid' + assert result['qsd'] == {} + assert result['qsd-'] == {} + assert result['qsd+'] == {} + assert result['qsd:'] == {} + + result = utils.parse_url( + 'http://hostname:invalid', verify_host=False, strict_port=False) + assert result['schema'] == 'http' + assert result['host'] == 'hostname:invalid' + assert result['port'] is None + assert result['user'] is None + assert result['password'] is None + assert result['fullpath'] is None + assert result['path'] is None + assert result['query'] is None + assert result['url'] == 'http://hostname:invalid' + assert result['qsd'] == {} + assert result['qsd-'] == {} + assert result['qsd+'] == {} + assert result['qsd:'] == {} + + result = utils.parse_url( + 'http://hostname:invalid?key=value&-minuskey=mvalue', + verify_host=False, strict_port=False) + assert result['schema'] == 'http' + assert result['host'] == 'hostname:invalid' + assert result['port'] is None + assert result['user'] is None + assert result['password'] is None + assert result['fullpath'] is None + assert result['path'] is None + assert result['query'] is None + assert result['url'] == 'http://hostname:invalid' + assert unquote(result['qsd']['-minuskey']) == 'mvalue' + assert unquote(result['qsd']['key']) == 'value' + assert unquote(result['qsd-']['minuskey']) == 'mvalue' + assert result['qsd+'] == {} + assert result['qsd:'] == {} + + # Handling of floats + assert utils.parse_url('http://hostname:4.2', strict_port=True) is None + result = utils.parse_url( + 'http://hostname:4.2', verify_host=False, strict_port=True) + assert result['schema'] == 'http' + assert result['host'] == 'hostname' + assert result['port'] == '4.2' + assert result['user'] is None + assert result['password'] is None + assert result['fullpath'] is None + assert result['path'] is None + assert result['query'] is None + assert result['url'] == 'http://hostname:4.2' + assert result['qsd'] == {} + assert result['qsd-'] == {} + assert result['qsd+'] == {} + assert result['qsd:'] == {} + + # A Port of zero is not acceptable for a regular hostname + assert utils.parse_url('http://hostname:0', strict_port=True) is None + + # No host verification (zero is an acceptable port when this is the case result = utils.parse_url('http://hostname:0', verify_host=False) assert result['schema'] == 'http' - assert result['host'] == 'hostname:0' - assert result['port'] is None + assert result['host'] == 'hostname' + assert result['port'] == 0 + assert result['user'] is None + assert result['password'] is None + assert result['fullpath'] is None + assert result['path'] is None + assert result['query'] is None + assert result['url'] == 'http://hostname:0' + assert result['qsd'] == {} + assert result['qsd-'] == {} + assert result['qsd+'] == {} + assert result['qsd:'] == {} + + result = utils.parse_url( + 'http://[2001:db8:002a:3256:adfe:05c0:0003:0006]:8080', + verify_host=False) + assert result['schema'] == 'http' + assert result['host'] == '[2001:db8:002a:3256:adfe:05c0:0003:0006]' + assert result['port'] == 8080 + assert result['user'] is None + assert result['password'] is None + assert result['fullpath'] is None + assert result['path'] is None + assert result['query'] is None + assert result['url'] == \ + 'http://[2001:db8:002a:3256:adfe:05c0:0003:0006]:8080' + assert result['qsd'] == {} + assert result['qsd-'] == {} + assert result['qsd+'] == {} + assert result['qsd:'] == {} + + result = utils.parse_url( + 'http://hostname:0', verify_host=False, strict_port=True) + assert result['schema'] == 'http' + assert result['host'] == 'hostname' + assert result['port'] == 0 assert result['user'] is None assert result['password'] is None assert result['fullpath'] is None