From e2ebdbdcf81182c240bd597943f40ddadf0ccefd Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Wed, 6 Oct 2021 17:31:34 -0400 Subject: [PATCH] Security CWE-312 and CWE-20 Handling (#453) --- apprise/Apprise.py | 56 ++++++--- apprise/AppriseAsset.py | 11 ++ apprise/config/ConfigBase.py | 77 ++++++++---- apprise/plugins/NotifyGoogleChat.py | 2 +- apprise/plugins/__init__.py | 12 +- apprise/utils.py | 175 +++++++++++++++++++++++++++- test/test_api.py | 20 ++-- test/test_apprise_config.py | 4 +- test/test_asyncio.py | 2 +- test/test_config_http.py | 2 +- test/test_logger.py | 81 +++++++++++++ test/test_utils.py | 69 +++++++++++ 12 files changed, 449 insertions(+), 62 deletions(-) diff --git a/apprise/Apprise.py b/apprise/Apprise.py index 608359d9..8207c200 100644 --- a/apprise/Apprise.py +++ b/apprise/Apprise.py @@ -34,6 +34,7 @@ from .common import MATCH_ALL_TAG from .utils import is_exclusive_match from .utils import parse_list from .utils import parse_urls +from .utils import cwe312_url from .logger import logger from .AppriseAsset import AppriseAsset @@ -123,9 +124,14 @@ class Apprise(object): # Initialize our result set results = None + # Prepare our Asset Object + asset = asset if isinstance(asset, AppriseAsset) else AppriseAsset() + if isinstance(url, six.string_types): # Acquire our url tokens - results = plugins.url_to_dict(url) + results = plugins.url_to_dict( + url, secure_logging=asset.secure_logging) + if results is None: # Failed to parse the server URL; detailed logging handled # inside url_to_dict - nothing to report here. @@ -139,25 +145,30 @@ class Apprise(object): # schema is a mandatory dictionary item as it is the only way # we can index into our loaded plugins logger.error('Dictionary does not include a "schema" entry.') - logger.trace('Invalid dictionary unpacked as:{}{}'.format( - os.linesep, os.linesep.join( - ['{}="{}"'.format(k, v) for k, v in results.items()]))) + logger.trace( + 'Invalid dictionary unpacked as:{}{}'.format( + os.linesep, os.linesep.join( + ['{}="{}"'.format(k, v) + for k, v in results.items()]))) return None - logger.trace('Dictionary unpacked as:{}{}'.format( - os.linesep, os.linesep.join( - ['{}="{}"'.format(k, v) for k, v in results.items()]))) + logger.trace( + 'Dictionary unpacked as:{}{}'.format( + os.linesep, os.linesep.join( + ['{}="{}"'.format(k, v) for k, v in results.items()]))) + # Otherwise we handle the invalid input specified else: - logger.error('Invalid URL specified: {}'.format(url)) + logger.error( + 'An invalid URL type (%s) was specified for instantiation', + type(url)) return None # Build a list of tags to associate with the newly added notifications results['tag'] = set(parse_list(tag)) - # Prepare our Asset Object - results['asset'] = \ - asset if isinstance(asset, AppriseAsset) else AppriseAsset() + # Set our Asset Object + results['asset'] = asset if suppress_exceptions: try: @@ -166,14 +177,21 @@ class Apprise(object): plugin = plugins.SCHEMA_MAP[results['schema']](**results) # Create log entry of loaded URL - logger.debug('Loaded {} URL: {}'.format( - plugins.SCHEMA_MAP[results['schema']].service_name, - plugin.url())) + logger.debug( + 'Loaded {} URL: {}'.format( + plugins.SCHEMA_MAP[results['schema']].service_name, + plugin.url(privacy=asset.secure_logging))) except Exception: + # CWE-312 (Secure Logging) Handling + loggable_url = url if not asset.secure_logging \ + else cwe312_url(url) + # the arguments are invalid or can not be used. - logger.error('Could not load {} URL: {}'.format( - plugins.SCHEMA_MAP[results['schema']].service_name, url)) + logger.error( + 'Could not load {} URL: {}'.format( + plugins.SCHEMA_MAP[results['schema']].service_name, + loggable_url)) return None else: @@ -402,7 +420,7 @@ class Apprise(object): except Exception: # A catch all so we don't have to abort early # just because one of our plugins has a bug in it. - logger.exception("Notification Exception") + logger.exception("Unhandled Notification Exception") return False @staticmethod @@ -434,10 +452,10 @@ class Apprise(object): if len(self) == 0: # Nothing to notify - raise TypeError + raise TypeError("No service(s) to notify") if not (title or body): - raise TypeError + raise TypeError("No message content specified to deliver") if six.PY2: # Python 2.7.x Unicode Character Handling diff --git a/apprise/AppriseAsset.py b/apprise/AppriseAsset.py index 1e085592..af491008 100644 --- a/apprise/AppriseAsset.py +++ b/apprise/AppriseAsset.py @@ -110,6 +110,17 @@ class AppriseAsset(object): # to a new line. interpret_escapes = False + # For more detail see CWE-312 @ + # https://cwe.mitre.org/data/definitions/312.html + # + # By enabling this, the logging output has additional overhead applied to + # it preventing secure password and secret information from being + # displayed in the logging. Since there is overhead involved in performing + # this cleanup; system owners who run in a very isolated environment may + # choose to disable this for a slight performance bump. It is recommended + # that you leave this option as is otherwise. + secure_logging = True + def __init__(self, **kwargs): """ Asset Initialization diff --git a/apprise/config/ConfigBase.py b/apprise/config/ConfigBase.py index 8b00be37..f2b958ed 100644 --- a/apprise/config/ConfigBase.py +++ b/apprise/config/ConfigBase.py @@ -39,6 +39,7 @@ from ..utils import GET_SCHEMA_RE from ..utils import parse_list from ..utils import parse_bool from ..utils import parse_urls +from ..utils import cwe312_url from . import SCHEMA_MAP # Test whether token is valid or not @@ -209,8 +210,8 @@ class ConfigBase(URLBase): # Configuration files were detected; recursively populate them # If we have been configured to do so for url in configs: - if self.recursion > 0: + if self.recursion > 0: # Attempt to acquire the schema at the very least to allow # our configuration based urls. schema = GET_SCHEMA_RE.match(url) @@ -223,6 +224,7 @@ class ConfigBase(URLBase): url = os.path.join(self.config_path, url) url = '{}://{}'.format(schema, URLBase.quote(url)) + else: # Ensure our schema is always in lower case schema = schema.group('schema').lower() @@ -233,13 +235,17 @@ class ConfigBase(URLBase): 'Unsupported include schema {}.'.format(schema)) continue + # CWE-312 (Secure Logging) Handling + loggable_url = url if not asset.secure_logging \ + else cwe312_url(url) + # Parse our url details of the server object as dictionary # containing all of the information parsed from our URL results = SCHEMA_MAP[schema].parse_url(url) if not results: # Failed to parse the server URL self.logger.warning( - 'Unparseable include URL {}'.format(url)) + 'Unparseable include URL {}'.format(loggable_url)) continue # Handle cross inclusion based on allow_cross_includes rules @@ -253,7 +259,7 @@ class ConfigBase(URLBase): # Prevent the loading if insecure base protocols ConfigBase.logger.warning( 'Including {}:// based configuration is prohibited. ' - 'Ignoring URL {}'.format(schema, url)) + 'Ignoring URL {}'.format(schema, loggable_url)) continue # Prepare our Asset Object @@ -279,7 +285,7 @@ class ConfigBase(URLBase): except Exception as e: # the arguments are invalid or can not be used. self.logger.warning( - 'Could not load include URL: {}'.format(url)) + 'Could not load include URL: {}'.format(loggable_url)) self.logger.debug('Loading Exception: {}'.format(str(e))) continue @@ -292,16 +298,23 @@ class ConfigBase(URLBase): del cfg_plugin else: + # CWE-312 (Secure Logging) Handling + loggable_url = url if not asset.secure_logging \ + else cwe312_url(url) + self.logger.debug( - 'Recursion limit reached; ignoring Include URL: %s' % url) + 'Recursion limit reached; ignoring Include URL: %s', + loggable_url) if self._cached_servers: - self.logger.info('Loaded {} entries from {}'.format( - len(self._cached_servers), self.url())) + self.logger.info( + 'Loaded {} entries from {}'.format( + len(self._cached_servers), + self.url(privacy=asset.secure_logging))) else: self.logger.warning( 'Failed to load Apprise configuration from {}'.format( - self.url())) + self.url(privacy=asset.secure_logging))) # Set the time our content was cached at self._cached_time = time.time() @@ -531,6 +544,9 @@ class ConfigBase(URLBase): # the include keyword configs = list() + # Prepare our Asset Object + asset = asset if isinstance(asset, AppriseAsset) else AppriseAsset() + # Define what a valid line should look like valid_line_re = re.compile( r'^\s*(?P([;#]+(?P.*))|' @@ -567,27 +583,37 @@ class ConfigBase(URLBase): continue if config: - ConfigBase.logger.debug('Include URL: {}'.format(config)) + # CWE-312 (Secure Logging) Handling + loggable_url = config if not asset.secure_logging \ + else cwe312_url(config) + + ConfigBase.logger.debug( + 'Include URL: {}'.format(loggable_url)) # Store our include line configs.append(config.strip()) continue + # CWE-312 (Secure Logging) Handling + loggable_url = url if not asset.secure_logging \ + else cwe312_url(url) + # Acquire our url tokens - results = plugins.url_to_dict(url) + results = plugins.url_to_dict( + url, secure_logging=asset.secure_logging) if results is None: # Failed to parse the server URL ConfigBase.logger.warning( - 'Unparseable URL {} on line {}.'.format(url, line)) + 'Unparseable URL {} on line {}.'.format( + loggable_url, line)) continue # Build a list of tags to associate with the newly added # notifications if any were set results['tag'] = set(parse_list(result.group('tags'))) - # Prepare our Asset Object - results['asset'] = \ - asset if isinstance(asset, AppriseAsset) else AppriseAsset() + # Set our Asset Object + results['asset'] = asset try: # Attempt to create an instance of our plugin using the @@ -595,13 +621,14 @@ class ConfigBase(URLBase): plugin = plugins.SCHEMA_MAP[results['schema']](**results) # Create log entry of loaded URL - ConfigBase.logger.debug('Loaded URL: {}'.format(plugin.url())) + ConfigBase.logger.debug( + 'Loaded URL: %s', plugin.url(privacy=asset.secure_logging)) except Exception as e: # the arguments are invalid or can not be used. ConfigBase.logger.warning( 'Could not load URL {} on line {}.'.format( - url, line)) + loggable_url, line)) ConfigBase.logger.debug('Loading Exception: %s' % str(e)) continue @@ -756,6 +783,10 @@ class ConfigBase(URLBase): # we can. Reset it to None on each iteration results = list() + # CWE-312 (Secure Logging) Handling + loggable_url = url if not asset.secure_logging \ + else cwe312_url(url) + if isinstance(url, six.string_types): # We're just a simple URL string... schema = GET_SCHEMA_RE.match(url) @@ -764,16 +795,18 @@ class ConfigBase(URLBase): # config file at least has something to take action # with. ConfigBase.logger.warning( - 'Invalid URL {}, entry #{}'.format(url, no + 1)) + 'Invalid URL {}, entry #{}'.format( + loggable_url, no + 1)) continue # We found a valid schema worthy of tracking; store it's # details: - _results = plugins.url_to_dict(url) + _results = plugins.url_to_dict( + url, secure_logging=asset.secure_logging) if _results is None: ConfigBase.logger.warning( 'Unparseable URL {}, entry #{}'.format( - url, no + 1)) + loggable_url, no + 1)) continue # add our results to our global set @@ -819,7 +852,8 @@ class ConfigBase(URLBase): 'Unsupported URL, entry #{}'.format(no + 1)) continue - _results = plugins.url_to_dict(_url) + _results = plugins.url_to_dict( + _url, secure_logging=asset.secure_logging) if _results is None: # Setup dictionary _results = { @@ -931,7 +965,8 @@ class ConfigBase(URLBase): # Create log entry of loaded URL ConfigBase.logger.debug( - 'Loaded URL: {}'.format(plugin.url())) + 'Loaded URL: {}'.format( + plugin.url(privacy=asset.secure_logging))) except Exception as e: # the arguments are invalid or can not be used. diff --git a/apprise/plugins/NotifyGoogleChat.py b/apprise/plugins/NotifyGoogleChat.py index e6c62cb9..2cba9840 100644 --- a/apprise/plugins/NotifyGoogleChat.py +++ b/apprise/plugins/NotifyGoogleChat.py @@ -301,7 +301,7 @@ class NotifyGoogleChat(NotifyBase): """ result = re.match( - r'^https://chat.googleapis.com/v1/spaces/' + r'^https://chat\.googleapis\.com/v1/spaces/' r'(?P[A-Z0-9_-]+)/messages/*(?P.+)$', url, re.I) diff --git a/apprise/plugins/__init__.py b/apprise/plugins/__init__.py index 3ac1ebbf..1d35f0b1 100644 --- a/apprise/plugins/__init__.py +++ b/apprise/plugins/__init__.py @@ -44,6 +44,7 @@ from ..common import NOTIFY_IMAGE_SIZES from ..common import NotifyType from ..common import NOTIFY_TYPES from ..utils import parse_list +from ..utils import cwe312_url from ..utils import GET_SCHEMA_RE from ..logger import logger from ..AppriseLocale import gettext_lazy as _ @@ -442,7 +443,7 @@ def details(plugin): } -def url_to_dict(url): +def url_to_dict(url, secure_logging=True): """ Takes an apprise URL and returns the tokens associated with it if they can be acquired based on the plugins available. @@ -457,13 +458,16 @@ def url_to_dict(url): # swap hash (#) tag values with their html version _url = url.replace('/#', '/%23') + # CWE-312 (Secure Logging) Handling + loggable_url = url if not secure_logging else cwe312_url(url) + # Attempt to acquire the schema at the very least to allow our plugins to # determine if they can make a better interpretation of a URL geared for # them. schema = GET_SCHEMA_RE.match(_url) if schema is None: # Not a valid URL; take an early exit - logger.error('Unsupported URL: {}'.format(url)) + logger.error('Unsupported URL: {}'.format(loggable_url)) return None # Ensure our schema is always in lower case @@ -480,7 +484,7 @@ def url_to_dict(url): None) if not results: - logger.error('Unparseable URL {}'.format(url)) + logger.error('Unparseable URL {}'.format(loggable_url)) return None logger.trace('URL {} unpacked as:{}{}'.format( @@ -493,7 +497,7 @@ def url_to_dict(url): results = SCHEMA_MAP[schema].parse_url(_url) if not results: logger.error('Unparseable {} URL {}'.format( - SCHEMA_MAP[schema].service_name, url)) + SCHEMA_MAP[schema].service_name, loggable_url)) return None logger.trace('{} URL {} unpacked as:{}{}'.format( diff --git a/apprise/utils.py b/apprise/utils.py index 59301865..137fcb45 100644 --- a/apprise/utils.py +++ b/apprise/utils.py @@ -216,7 +216,7 @@ def is_ipaddr(addr, ipv4=True, ipv6=True): return False -def is_hostname(hostname, ipv4=True, ipv6=True): +def is_hostname(hostname, ipv4=True, ipv6=True, underscore=True): """ Validate hostname """ @@ -244,10 +244,11 @@ def is_hostname(hostname, ipv4=True, ipv6=True): # - Hostnames can not start with the hyphen (-) character. # - as a workaround for https://github.com/docker/compose/issues/229 to # being able to address services in other stacks, we also allow - # underscores in hostnames + # underscores in hostnames (if flag is set accordingly) # - labels can not exceed 63 characters allowed = re.compile( - r'^[a-z0-9][a-z0-9_-]{1,62}(? 1 and \ + not is_hostname(word, ipv4=True, ipv6=True, underscore=False): + # Verify if it is a hostname or not + return '{}...{}'.format(word[0:1], word[-1:]) + + elif len(word) >= 16: + # an IP will be 15 characters so we don't want to use a smaller + # value then 16 (e.g 101.102.103.104) + # we can assume very long words are passwords otherwise + return '{}...{}'.format(word[0:1], word[-1:]) + + if advanced: + # + # Mark word a secret based on it's obscurity + # + + # Our variances will increase depending on these variables: + last_variance = None + obscurity = 0 + + for c in word: + # Detect our variance + if c.isdigit(): + variance = Variance.NUMERIC + elif c.isalpha() and c.isupper(): + variance = Variance.ALPHA_UPPER + elif c.isalpha() and c.islower(): + variance = Variance.ALPHA_LOWER + else: + variance = Variance.SPECIAL + + if last_variance != variance or variance == Variance.SPECIAL: + obscurity += 1 + + if obscurity >= threshold: + return '{}...{}'.format(word[0:1], word[-1:]) + + last_variance = variance + + # Otherwise we're good; return our word + return word + + +def cwe312_url(url): + """ + This function was written to help mask secure/private information that may + or may not be found on an Apprise URL. The idea is to not disrupt the + structure of the previous URL too much, yet still protect the users + private information from being logged directly to screen. + + For more detail see CWE-312 @ + https://cwe.mitre.org/data/definitions/312.html + + For example, consider the URL: http://user:password@localhost/ + + When passed into this function, the return value would be: + http://user:****@localhost/ + + Since apprise allows you to put private information everywhere in it's + custom URLs, it uses this function to manipulate the content before + returning to any kind of logger. + + The idea is that the URL can still be interpreted by the person who + constructed them, but not to an intruder. + """ + # Parse our URL + results = parse_url(url) + if not results: + # Nothing was returned (invalid data was fed in); return our + # information as it was fed to us (without changing it) + return url + + # Update our URL with values + results['password'] = cwe312_word(results['password'], force=True) + if not results['schema'].startswith('http'): + results['user'] = cwe312_word(results['user']) + results['host'] = cwe312_word(results['host']) + + else: + results['host'] = cwe312_word(results['host'], advanced=False) + results['user'] = cwe312_word(results['user'], advanced=False) + + # Apply our full path scan in all cases + results['fullpath'] = '/' + \ + '/'.join([cwe312_word(x) + for x in re.split( + r'[\\/]+', + results['fullpath'].lstrip('/'))]) \ + if results['fullpath'] else '' + + # + # Now re-assemble our URL for display purposes + # + + # Determine Authentication + auth = '' + if results['user'] and results['password']: + auth = '{user}:{password}@'.format( + user=results['user'], + password=results['password'], + ) + elif results['user']: + auth = '{user}@'.format( + user=results['user'], + ) + + params = '' + if results['qsd']: + params = '?{}'.format( + "&".join(["{}={}".format(k, cwe312_word(v, force=( + k in ('password', 'secret', 'pass', 'token', 'key', + 'id', 'apikey', 'to')))) + for k, v in results['qsd'].items()])) + + return '{schema}://{auth}{hostname}{port}{fullpath}{params}'.format( + schema=results['schema'], + auth=auth, + # never encode hostname since we're expecting it to be a valid one + hostname=results['host'], + port='' if not results['port'] else ':{}'.format(results['port']), + fullpath=results['fullpath'] if results['fullpath'] else '', + params=params, + ) + + @contextlib.contextmanager def environ(*remove, **update): """ diff --git a/test/test_api.py b/test/test_api.py index d9f8f006..b11d5024 100644 --- a/test/test_api.py +++ b/test/test_api.py @@ -190,7 +190,7 @@ def apprise_test(do_notify): # We fail whenever we're initialized raise TypeError() - def url(self): + def url(self, **kwargs): # Support URL return '' @@ -204,7 +204,7 @@ def apprise_test(do_notify): super(GoodNotification, self).__init__( notify_format=NotifyFormat.HTML, **kwargs) - def url(self): + def url(self, **kwargs): # Support URL return '' @@ -292,7 +292,7 @@ def apprise_test(do_notify): # Pretend everything is okay raise TypeError() - def url(self): + def url(self, **kwargs): # Support URL return '' @@ -301,7 +301,7 @@ def apprise_test(do_notify): # Pretend everything is okay raise RuntimeError() - def url(self): + def url(self, **kwargs): # Support URL return '' @@ -311,7 +311,7 @@ def apprise_test(do_notify): # Pretend everything is okay return False - def url(self): + def url(self, **kwargs): # Support URL return '' @@ -346,7 +346,7 @@ def apprise_test(do_notify): # Pretend everything is okay raise TypeError() - def url(self): + def url(self, **kwargs): # Support URL return '' @@ -744,7 +744,7 @@ def test_apprise_notify_formats(tmpdir): # Pretend everything is okay return True - def url(self): + def url(self, **kwargs): # Support URL return '' @@ -760,7 +760,7 @@ def test_apprise_notify_formats(tmpdir): # Pretend everything is okay return True - def url(self): + def url(self, **kwargs): # Support URL return '' @@ -776,7 +776,7 @@ def test_apprise_notify_formats(tmpdir): # Pretend everything is okay return True - def url(self): + def url(self, **kwargs): # Support URL return '' @@ -1066,7 +1066,7 @@ def test_apprise_details(): } }) - def url(self): + def url(self, **kwargs): # Support URL return '' diff --git a/test/test_apprise_config.py b/test/test_apprise_config.py index 60fe3043..af96dd54 100644 --- a/test/test_apprise_config.py +++ b/test/test_apprise_config.py @@ -246,7 +246,7 @@ def test_apprise_multi_config_entries(tmpdir): # Pretend everything is okay return True - def url(self): + def url(self, **kwargs): # support url() return '' @@ -537,7 +537,7 @@ def test_apprise_config_with_apprise_obj(tmpdir): # Pretend everything is okay return True - def url(self): + def url(self, **kwargs): # support url() return '' diff --git a/test/test_asyncio.py b/test/test_asyncio.py index 67daa132..68a26b1e 100644 --- a/test/test_asyncio.py +++ b/test/test_asyncio.py @@ -49,7 +49,7 @@ def test_apprise_asyncio_runtime_error(): super(GoodNotification, self).__init__( notify_format=NotifyFormat.HTML, **kwargs) - def url(self): + def url(self, **kwargs): # Support URL return '' diff --git a/test/test_config_http.py b/test/test_config_http.py index 1be7437a..2dfd01ee 100644 --- a/test/test_config_http.py +++ b/test/test_config_http.py @@ -69,7 +69,7 @@ def test_config_http(mock_post): # Pretend everything is okay return True - def url(self): + def url(self, **kwargs): # Support url() function return '' diff --git a/test/test_logger.py b/test/test_logger.py index 33fbfbb4..9e2d963c 100644 --- a/test/test_logger.py +++ b/test/test_logger.py @@ -28,7 +28,9 @@ import os import sys import mock import pytest +import requests from apprise import Apprise +from apprise import AppriseAsset from apprise import URLBase from apprise.logger import LogCapture @@ -620,3 +622,82 @@ def test_apprise_log_file_captures_py2(tmpdir): # Disable Logging logging.disable(logging.CRITICAL) + + +@pytest.mark.skipif(sys.version_info.major <= 2, reason="Requires Python 3.x+") +@mock.patch('requests.post') +def test_apprise_secure_logging(mock_post): + """ + API: Apprise() secure logging tests + """ + + # Ensure we're not running in a disabled state + logging.disable(logging.NOTSET) + + logger.setLevel(logging.CRITICAL) + + # Prepare Mock + mock_post.return_value = requests.Request() + mock_post.return_value.status_code = requests.codes.ok + + # Default Secure Logging is set to enabled + asset = AppriseAsset() + assert asset.secure_logging is True + + # Load our asset + a = Apprise(asset=asset) + + with LogCapture(level=logging.DEBUG) as stream: + # add a test server + assert a.add("json://user:pass1$-3!@localhost") is True + + # Our servers should carry this flag + a[0].asset.secure_logging is True + + logs = re.split(r'\r*\n', stream.getvalue().rstrip()) + assert len(logs) == 1 + entry = re.split(r'\s-\s', logs[0]) + assert len(entry) == 3 + assert entry[1] == 'DEBUG' + assert entry[2].startswith( + 'Loaded JSON URL: json://user:****@localhost/') + + # Send notification + assert a.notify("test") is True + + # Test our call count + assert mock_post.call_count == 1 + + # Reset + mock_post.reset_mock() + + # Now we test the reverse configuration and turn off + # secure logging. + + # Default Secure Logging is set to disable + asset = AppriseAsset(secure_logging=False) + assert asset.secure_logging is False + + # Load our asset + a = Apprise(asset=asset) + + with LogCapture(level=logging.DEBUG) as stream: + # add a test server + assert a.add("json://user:pass1$-3!@localhost") is True + + # Our servers should carry this flag + a[0].asset.secure_logging is False + + logs = re.split(r'\r*\n', stream.getvalue().rstrip()) + assert len(logs) == 1 + entry = re.split(r'\s-\s', logs[0]) + assert len(entry) == 3 + assert entry[1] == 'DEBUG' + + # Note that our password is no longer escaped (it is however + # url encoded) + assert entry[2].startswith( + 'Loaded JSON URL: json://user:pass1%24-3%21@localhost/') + + # Disable Logging + logging.disable(logging.CRITICAL) diff --git a/test/test_utils.py b/test/test_utils.py index 35e73637..f0c90a0c 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -554,6 +554,13 @@ def test_is_hostname(): assert utils.is_hostname('valid-underscores_in_host.ca') == \ 'valid-underscores_in_host.ca' + # Underscores are supported by default + assert utils.is_hostname('valid_dashes_in_host.ca') == \ + 'valid_dashes_in_host.ca' + # However they are not if specified otherwise: + assert utils.is_hostname( + 'valid_dashes_in_host.ca', underscore=False) is False + # Invalid Hostnames assert utils.is_hostname('-hostname.that.starts.with.a.dash') is False assert utils.is_hostname('invalid-characters_#^.ca') is False @@ -1493,3 +1500,65 @@ def test_apply_templating(): template, app_mode=utils.TemplateType.JSON, **{'value': '"quotes are escaped"'}) assert result == '{value: "\\"quotes are escaped\\""}' + + +def test_cwe312_word(): + """utils: cwe312_word() testing + """ + assert utils.cwe312_word(None) is None + assert utils.cwe312_word(42) == 42 + assert utils.cwe312_word('') == '' + assert utils.cwe312_word(' ') == ' ' + assert utils.cwe312_word('!') == '!' + + assert utils.cwe312_word('a') == 'a' + assert utils.cwe312_word('ab') == 'ab' + assert utils.cwe312_word('abc') == 'abc' + assert utils.cwe312_word('abcd') == 'abcd' + assert utils.cwe312_word('abcd', force=True) == 'a...d' + + assert utils.cwe312_word('abc--d') == 'abc--d' + assert utils.cwe312_word('a-domain.ca') == 'a...a' + + # Variances to still catch domain + assert utils.cwe312_word('a-domain.ca', advanced=False) == 'a-domain.ca' + assert utils.cwe312_word('a-domain.ca', threshold=6) == 'a-domain.ca' + + +def test_cwe312_url(): + """utils: cwe312_url() testing + """ + assert utils.cwe312_url(None) is None + assert utils.cwe312_url(42) == 42 + assert utils.cwe312_url('http://') == 'http://' + assert utils.cwe312_url('discord://') == 'discord://' + assert utils.cwe312_url('path') == 'http://path' + assert utils.cwe312_url('path/') == 'http://path/' + + # Now test http:// private data + assert utils.cwe312_url( + 'http://user:pass123@localhost') == 'http://user:p...3@localhost' + assert utils.cwe312_url( + 'http://user@localhost') == 'http://user@localhost' + assert utils.cwe312_url( + 'http://user@localhost?password=abc123') == \ + 'http://user@localhost?password=a...3' + assert utils.cwe312_url( + 'http://user@localhost?secret=secret-.12345') == \ + 'http://user@localhost?secret=s...5' + + # Now test other:// private data + assert utils.cwe312_url( + 'gitter://b5637831f563aa846bb5b2c27d8fe8f633b8f026/apprise') == \ + 'gitter://b...6/apprise' + assert utils.cwe312_url( + 'gitter://b5637831f563aa846bb5b2c27d8fe8f633b8f026' + '/apprise/?pass=abc123') == \ + 'gitter://b...6/apprise?pass=a...3' + + assert utils.cwe312_url( + 'slack://mybot@xoxb-43598234231-3248932482278-BZK5Wj15B9mPh1RkShJoCZ44' + '/lead2gold@gmail.com') == 'slack://mybot@x...4/l...m' + assert utils.cwe312_url( + 'slack://test@B4QP3WWB4/J3QWT41JM/XIl2ffpqXkzkwMXrJdevi7W3/' + '#random') == 'slack://test@B...4/J...M/X...3'