Security CWE-312 and CWE-20 Handling (#453)

This commit is contained in:
Chris Caron 2021-10-06 17:31:34 -04:00 committed by GitHub
parent abb7547f20
commit e2ebdbdcf8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 449 additions and 62 deletions

View File

@ -34,6 +34,7 @@ from .common import MATCH_ALL_TAG
from .utils import is_exclusive_match
from .utils import parse_list
from .utils import parse_urls
from .utils import cwe312_url
from .logger import logger
from .AppriseAsset import AppriseAsset
@ -123,9 +124,14 @@ class Apprise(object):
# Initialize our result set
results = None
# Prepare our Asset Object
asset = asset if isinstance(asset, AppriseAsset) else AppriseAsset()
if isinstance(url, six.string_types):
# Acquire our url tokens
results = plugins.url_to_dict(url)
results = plugins.url_to_dict(
url, secure_logging=asset.secure_logging)
if results is None:
# Failed to parse the server URL; detailed logging handled
# inside url_to_dict - nothing to report here.
@ -139,25 +145,30 @@ class Apprise(object):
# schema is a mandatory dictionary item as it is the only way
# we can index into our loaded plugins
logger.error('Dictionary does not include a "schema" entry.')
logger.trace('Invalid dictionary unpacked as:{}{}'.format(
os.linesep, os.linesep.join(
['{}="{}"'.format(k, v) for k, v in results.items()])))
logger.trace(
'Invalid dictionary unpacked as:{}{}'.format(
os.linesep, os.linesep.join(
['{}="{}"'.format(k, v)
for k, v in results.items()])))
return None
logger.trace('Dictionary unpacked as:{}{}'.format(
os.linesep, os.linesep.join(
['{}="{}"'.format(k, v) for k, v in results.items()])))
logger.trace(
'Dictionary unpacked as:{}{}'.format(
os.linesep, os.linesep.join(
['{}="{}"'.format(k, v) for k, v in results.items()])))
# Otherwise we handle the invalid input specified
else:
logger.error('Invalid URL specified: {}'.format(url))
logger.error(
'An invalid URL type (%s) was specified for instantiation',
type(url))
return None
# Build a list of tags to associate with the newly added notifications
results['tag'] = set(parse_list(tag))
# Prepare our Asset Object
results['asset'] = \
asset if isinstance(asset, AppriseAsset) else AppriseAsset()
# Set our Asset Object
results['asset'] = asset
if suppress_exceptions:
try:
@ -166,14 +177,21 @@ class Apprise(object):
plugin = plugins.SCHEMA_MAP[results['schema']](**results)
# Create log entry of loaded URL
logger.debug('Loaded {} URL: {}'.format(
plugins.SCHEMA_MAP[results['schema']].service_name,
plugin.url()))
logger.debug(
'Loaded {} URL: {}'.format(
plugins.SCHEMA_MAP[results['schema']].service_name,
plugin.url(privacy=asset.secure_logging)))
except Exception:
# CWE-312 (Secure Logging) Handling
loggable_url = url if not asset.secure_logging \
else cwe312_url(url)
# the arguments are invalid or can not be used.
logger.error('Could not load {} URL: {}'.format(
plugins.SCHEMA_MAP[results['schema']].service_name, url))
logger.error(
'Could not load {} URL: {}'.format(
plugins.SCHEMA_MAP[results['schema']].service_name,
loggable_url))
return None
else:
@ -402,7 +420,7 @@ class Apprise(object):
except Exception:
# A catch all so we don't have to abort early
# just because one of our plugins has a bug in it.
logger.exception("Notification Exception")
logger.exception("Unhandled Notification Exception")
return False
@staticmethod
@ -434,10 +452,10 @@ class Apprise(object):
if len(self) == 0:
# Nothing to notify
raise TypeError
raise TypeError("No service(s) to notify")
if not (title or body):
raise TypeError
raise TypeError("No message content specified to deliver")
if six.PY2:
# Python 2.7.x Unicode Character Handling

View File

@ -110,6 +110,17 @@ class AppriseAsset(object):
# to a new line.
interpret_escapes = False
# For more detail see CWE-312 @
# https://cwe.mitre.org/data/definitions/312.html
#
# By enabling this, the logging output has additional overhead applied to
# it preventing secure password and secret information from being
# displayed in the logging. Since there is overhead involved in performing
# this cleanup; system owners who run in a very isolated environment may
# choose to disable this for a slight performance bump. It is recommended
# that you leave this option as is otherwise.
secure_logging = True
def __init__(self, **kwargs):
"""
Asset Initialization

View File

@ -39,6 +39,7 @@ from ..utils import GET_SCHEMA_RE
from ..utils import parse_list
from ..utils import parse_bool
from ..utils import parse_urls
from ..utils import cwe312_url
from . import SCHEMA_MAP
# Test whether token is valid or not
@ -209,8 +210,8 @@ class ConfigBase(URLBase):
# Configuration files were detected; recursively populate them
# If we have been configured to do so
for url in configs:
if self.recursion > 0:
if self.recursion > 0:
# Attempt to acquire the schema at the very least to allow
# our configuration based urls.
schema = GET_SCHEMA_RE.match(url)
@ -223,6 +224,7 @@ class ConfigBase(URLBase):
url = os.path.join(self.config_path, url)
url = '{}://{}'.format(schema, URLBase.quote(url))
else:
# Ensure our schema is always in lower case
schema = schema.group('schema').lower()
@ -233,13 +235,17 @@ class ConfigBase(URLBase):
'Unsupported include schema {}.'.format(schema))
continue
# CWE-312 (Secure Logging) Handling
loggable_url = url if not asset.secure_logging \
else cwe312_url(url)
# Parse our url details of the server object as dictionary
# containing all of the information parsed from our URL
results = SCHEMA_MAP[schema].parse_url(url)
if not results:
# Failed to parse the server URL
self.logger.warning(
'Unparseable include URL {}'.format(url))
'Unparseable include URL {}'.format(loggable_url))
continue
# Handle cross inclusion based on allow_cross_includes rules
@ -253,7 +259,7 @@ class ConfigBase(URLBase):
# Prevent the loading if insecure base protocols
ConfigBase.logger.warning(
'Including {}:// based configuration is prohibited. '
'Ignoring URL {}'.format(schema, url))
'Ignoring URL {}'.format(schema, loggable_url))
continue
# Prepare our Asset Object
@ -279,7 +285,7 @@ class ConfigBase(URLBase):
except Exception as e:
# the arguments are invalid or can not be used.
self.logger.warning(
'Could not load include URL: {}'.format(url))
'Could not load include URL: {}'.format(loggable_url))
self.logger.debug('Loading Exception: {}'.format(str(e)))
continue
@ -292,16 +298,23 @@ class ConfigBase(URLBase):
del cfg_plugin
else:
# CWE-312 (Secure Logging) Handling
loggable_url = url if not asset.secure_logging \
else cwe312_url(url)
self.logger.debug(
'Recursion limit reached; ignoring Include URL: %s' % url)
'Recursion limit reached; ignoring Include URL: %s',
loggable_url)
if self._cached_servers:
self.logger.info('Loaded {} entries from {}'.format(
len(self._cached_servers), self.url()))
self.logger.info(
'Loaded {} entries from {}'.format(
len(self._cached_servers),
self.url(privacy=asset.secure_logging)))
else:
self.logger.warning(
'Failed to load Apprise configuration from {}'.format(
self.url()))
self.url(privacy=asset.secure_logging)))
# Set the time our content was cached at
self._cached_time = time.time()
@ -531,6 +544,9 @@ class ConfigBase(URLBase):
# the include keyword
configs = list()
# Prepare our Asset Object
asset = asset if isinstance(asset, AppriseAsset) else AppriseAsset()
# Define what a valid line should look like
valid_line_re = re.compile(
r'^\s*(?P<line>([;#]+(?P<comment>.*))|'
@ -567,27 +583,37 @@ class ConfigBase(URLBase):
continue
if config:
ConfigBase.logger.debug('Include URL: {}'.format(config))
# CWE-312 (Secure Logging) Handling
loggable_url = config if not asset.secure_logging \
else cwe312_url(config)
ConfigBase.logger.debug(
'Include URL: {}'.format(loggable_url))
# Store our include line
configs.append(config.strip())
continue
# CWE-312 (Secure Logging) Handling
loggable_url = url if not asset.secure_logging \
else cwe312_url(url)
# Acquire our url tokens
results = plugins.url_to_dict(url)
results = plugins.url_to_dict(
url, secure_logging=asset.secure_logging)
if results is None:
# Failed to parse the server URL
ConfigBase.logger.warning(
'Unparseable URL {} on line {}.'.format(url, line))
'Unparseable URL {} on line {}.'.format(
loggable_url, line))
continue
# Build a list of tags to associate with the newly added
# notifications if any were set
results['tag'] = set(parse_list(result.group('tags')))
# Prepare our Asset Object
results['asset'] = \
asset if isinstance(asset, AppriseAsset) else AppriseAsset()
# Set our Asset Object
results['asset'] = asset
try:
# Attempt to create an instance of our plugin using the
@ -595,13 +621,14 @@ class ConfigBase(URLBase):
plugin = plugins.SCHEMA_MAP[results['schema']](**results)
# Create log entry of loaded URL
ConfigBase.logger.debug('Loaded URL: {}'.format(plugin.url()))
ConfigBase.logger.debug(
'Loaded URL: %s', plugin.url(privacy=asset.secure_logging))
except Exception as e:
# the arguments are invalid or can not be used.
ConfigBase.logger.warning(
'Could not load URL {} on line {}.'.format(
url, line))
loggable_url, line))
ConfigBase.logger.debug('Loading Exception: %s' % str(e))
continue
@ -756,6 +783,10 @@ class ConfigBase(URLBase):
# we can. Reset it to None on each iteration
results = list()
# CWE-312 (Secure Logging) Handling
loggable_url = url if not asset.secure_logging \
else cwe312_url(url)
if isinstance(url, six.string_types):
# We're just a simple URL string...
schema = GET_SCHEMA_RE.match(url)
@ -764,16 +795,18 @@ class ConfigBase(URLBase):
# config file at least has something to take action
# with.
ConfigBase.logger.warning(
'Invalid URL {}, entry #{}'.format(url, no + 1))
'Invalid URL {}, entry #{}'.format(
loggable_url, no + 1))
continue
# We found a valid schema worthy of tracking; store it's
# details:
_results = plugins.url_to_dict(url)
_results = plugins.url_to_dict(
url, secure_logging=asset.secure_logging)
if _results is None:
ConfigBase.logger.warning(
'Unparseable URL {}, entry #{}'.format(
url, no + 1))
loggable_url, no + 1))
continue
# add our results to our global set
@ -819,7 +852,8 @@ class ConfigBase(URLBase):
'Unsupported URL, entry #{}'.format(no + 1))
continue
_results = plugins.url_to_dict(_url)
_results = plugins.url_to_dict(
_url, secure_logging=asset.secure_logging)
if _results is None:
# Setup dictionary
_results = {
@ -931,7 +965,8 @@ class ConfigBase(URLBase):
# Create log entry of loaded URL
ConfigBase.logger.debug(
'Loaded URL: {}'.format(plugin.url()))
'Loaded URL: {}'.format(
plugin.url(privacy=asset.secure_logging)))
except Exception as e:
# the arguments are invalid or can not be used.

View File

@ -301,7 +301,7 @@ class NotifyGoogleChat(NotifyBase):
"""
result = re.match(
r'^https://chat.googleapis.com/v1/spaces/'
r'^https://chat\.googleapis\.com/v1/spaces/'
r'(?P<workspace>[A-Z0-9_-]+)/messages/*(?P<params>.+)$',
url, re.I)

View File

@ -44,6 +44,7 @@ from ..common import NOTIFY_IMAGE_SIZES
from ..common import NotifyType
from ..common import NOTIFY_TYPES
from ..utils import parse_list
from ..utils import cwe312_url
from ..utils import GET_SCHEMA_RE
from ..logger import logger
from ..AppriseLocale import gettext_lazy as _
@ -442,7 +443,7 @@ def details(plugin):
}
def url_to_dict(url):
def url_to_dict(url, secure_logging=True):
"""
Takes an apprise URL and returns the tokens associated with it
if they can be acquired based on the plugins available.
@ -457,13 +458,16 @@ def url_to_dict(url):
# swap hash (#) tag values with their html version
_url = url.replace('/#', '/%23')
# CWE-312 (Secure Logging) Handling
loggable_url = url if not secure_logging else cwe312_url(url)
# Attempt to acquire the schema at the very least to allow our plugins to
# determine if they can make a better interpretation of a URL geared for
# them.
schema = GET_SCHEMA_RE.match(_url)
if schema is None:
# Not a valid URL; take an early exit
logger.error('Unsupported URL: {}'.format(url))
logger.error('Unsupported URL: {}'.format(loggable_url))
return None
# Ensure our schema is always in lower case
@ -480,7 +484,7 @@ def url_to_dict(url):
None)
if not results:
logger.error('Unparseable URL {}'.format(url))
logger.error('Unparseable URL {}'.format(loggable_url))
return None
logger.trace('URL {} unpacked as:{}{}'.format(
@ -493,7 +497,7 @@ def url_to_dict(url):
results = SCHEMA_MAP[schema].parse_url(_url)
if not results:
logger.error('Unparseable {} URL {}'.format(
SCHEMA_MAP[schema].service_name, url))
SCHEMA_MAP[schema].service_name, loggable_url))
return None
logger.trace('{} URL {} unpacked as:{}{}'.format(

View File

@ -216,7 +216,7 @@ def is_ipaddr(addr, ipv4=True, ipv6=True):
return False
def is_hostname(hostname, ipv4=True, ipv6=True):
def is_hostname(hostname, ipv4=True, ipv6=True, underscore=True):
"""
Validate hostname
"""
@ -244,10 +244,11 @@ def is_hostname(hostname, ipv4=True, ipv6=True):
# - Hostnames can not start with the hyphen (-) character.
# - as a workaround for https://github.com/docker/compose/issues/229 to
# being able to address services in other stacks, we also allow
# underscores in hostnames
# underscores in hostnames (if flag is set accordingly)
# - labels can not exceed 63 characters
allowed = re.compile(
r'^[a-z0-9][a-z0-9_-]{1,62}(?<!-)$',
r'^[a-z0-9][a-z0-9_-]{1,62}(?<![_-])$' if underscore else
r'^[a-z0-9][a-z0-9-]{1,62}(?<!-)$',
re.IGNORECASE,
)
@ -1016,6 +1017,174 @@ def validate_regex(value, regex=r'[^\s]+', flags=re.I, strip=True, fmt=None):
return value.strip() if strip else value
def cwe312_word(word, force=False, advanced=True, threshold=5):
"""
This function was written to help mask secure/private information that may
or may not be found within Apprise. The idea is to provide a presentable
word response that the user who prepared it would understand, yet not
reveal any private information for any potential intruder
For more detail see CWE-312 @
https://cwe.mitre.org/data/definitions/312.html
The `force` is an optional argument used to keep the string formatting
consistent and in one place. If set, the content passed in is presumed
to be containing secret information and will be updated accordingly.
If advanced is set to `True` then content is additionally checked for
upper/lower/ascii/numerical variances. If an obscurity threshold is
reached, then content is considered secret
"""
class Variance(object):
"""
A Simple List of Possible Character Variances
"""
# An Upper Case Character (ABCDEF... etc)
ALPHA_UPPER = '+'
# An Lower Case Character (abcdef... etc)
ALPHA_LOWER = '-'
# A Special Character ($%^;... etc)
SPECIAL = 's'
# A Numerical Character (1234... etc)
NUMERIC = 'n'
if not (isinstance(word, six.string_types) and word.strip()):
# not a password if it's not something we even support
return word
# Formatting
word = word.strip()
if force:
# We're forcing the representation to be a secret
# We do this for consistency
return '{}...{}'.format(word[0:1], word[-1:])
elif len(word) > 1 and \
not is_hostname(word, ipv4=True, ipv6=True, underscore=False):
# Verify if it is a hostname or not
return '{}...{}'.format(word[0:1], word[-1:])
elif len(word) >= 16:
# an IP will be 15 characters so we don't want to use a smaller
# value then 16 (e.g 101.102.103.104)
# we can assume very long words are passwords otherwise
return '{}...{}'.format(word[0:1], word[-1:])
if advanced:
#
# Mark word a secret based on it's obscurity
#
# Our variances will increase depending on these variables:
last_variance = None
obscurity = 0
for c in word:
# Detect our variance
if c.isdigit():
variance = Variance.NUMERIC
elif c.isalpha() and c.isupper():
variance = Variance.ALPHA_UPPER
elif c.isalpha() and c.islower():
variance = Variance.ALPHA_LOWER
else:
variance = Variance.SPECIAL
if last_variance != variance or variance == Variance.SPECIAL:
obscurity += 1
if obscurity >= threshold:
return '{}...{}'.format(word[0:1], word[-1:])
last_variance = variance
# Otherwise we're good; return our word
return word
def cwe312_url(url):
"""
This function was written to help mask secure/private information that may
or may not be found on an Apprise URL. The idea is to not disrupt the
structure of the previous URL too much, yet still protect the users
private information from being logged directly to screen.
For more detail see CWE-312 @
https://cwe.mitre.org/data/definitions/312.html
For example, consider the URL: http://user:password@localhost/
When passed into this function, the return value would be:
http://user:****@localhost/
Since apprise allows you to put private information everywhere in it's
custom URLs, it uses this function to manipulate the content before
returning to any kind of logger.
The idea is that the URL can still be interpreted by the person who
constructed them, but not to an intruder.
"""
# Parse our URL
results = parse_url(url)
if not results:
# Nothing was returned (invalid data was fed in); return our
# information as it was fed to us (without changing it)
return url
# Update our URL with values
results['password'] = cwe312_word(results['password'], force=True)
if not results['schema'].startswith('http'):
results['user'] = cwe312_word(results['user'])
results['host'] = cwe312_word(results['host'])
else:
results['host'] = cwe312_word(results['host'], advanced=False)
results['user'] = cwe312_word(results['user'], advanced=False)
# Apply our full path scan in all cases
results['fullpath'] = '/' + \
'/'.join([cwe312_word(x)
for x in re.split(
r'[\\/]+',
results['fullpath'].lstrip('/'))]) \
if results['fullpath'] else ''
#
# Now re-assemble our URL for display purposes
#
# Determine Authentication
auth = ''
if results['user'] and results['password']:
auth = '{user}:{password}@'.format(
user=results['user'],
password=results['password'],
)
elif results['user']:
auth = '{user}@'.format(
user=results['user'],
)
params = ''
if results['qsd']:
params = '?{}'.format(
"&".join(["{}={}".format(k, cwe312_word(v, force=(
k in ('password', 'secret', 'pass', 'token', 'key',
'id', 'apikey', 'to'))))
for k, v in results['qsd'].items()]))
return '{schema}://{auth}{hostname}{port}{fullpath}{params}'.format(
schema=results['schema'],
auth=auth,
# never encode hostname since we're expecting it to be a valid one
hostname=results['host'],
port='' if not results['port'] else ':{}'.format(results['port']),
fullpath=results['fullpath'] if results['fullpath'] else '',
params=params,
)
@contextlib.contextmanager
def environ(*remove, **update):
"""

View File

@ -190,7 +190,7 @@ def apprise_test(do_notify):
# We fail whenever we're initialized
raise TypeError()
def url(self):
def url(self, **kwargs):
# Support URL
return ''
@ -204,7 +204,7 @@ def apprise_test(do_notify):
super(GoodNotification, self).__init__(
notify_format=NotifyFormat.HTML, **kwargs)
def url(self):
def url(self, **kwargs):
# Support URL
return ''
@ -292,7 +292,7 @@ def apprise_test(do_notify):
# Pretend everything is okay
raise TypeError()
def url(self):
def url(self, **kwargs):
# Support URL
return ''
@ -301,7 +301,7 @@ def apprise_test(do_notify):
# Pretend everything is okay
raise RuntimeError()
def url(self):
def url(self, **kwargs):
# Support URL
return ''
@ -311,7 +311,7 @@ def apprise_test(do_notify):
# Pretend everything is okay
return False
def url(self):
def url(self, **kwargs):
# Support URL
return ''
@ -346,7 +346,7 @@ def apprise_test(do_notify):
# Pretend everything is okay
raise TypeError()
def url(self):
def url(self, **kwargs):
# Support URL
return ''
@ -744,7 +744,7 @@ def test_apprise_notify_formats(tmpdir):
# Pretend everything is okay
return True
def url(self):
def url(self, **kwargs):
# Support URL
return ''
@ -760,7 +760,7 @@ def test_apprise_notify_formats(tmpdir):
# Pretend everything is okay
return True
def url(self):
def url(self, **kwargs):
# Support URL
return ''
@ -776,7 +776,7 @@ def test_apprise_notify_formats(tmpdir):
# Pretend everything is okay
return True
def url(self):
def url(self, **kwargs):
# Support URL
return ''
@ -1066,7 +1066,7 @@ def test_apprise_details():
}
})
def url(self):
def url(self, **kwargs):
# Support URL
return ''

View File

@ -246,7 +246,7 @@ def test_apprise_multi_config_entries(tmpdir):
# Pretend everything is okay
return True
def url(self):
def url(self, **kwargs):
# support url()
return ''
@ -537,7 +537,7 @@ def test_apprise_config_with_apprise_obj(tmpdir):
# Pretend everything is okay
return True
def url(self):
def url(self, **kwargs):
# support url()
return ''

View File

@ -49,7 +49,7 @@ def test_apprise_asyncio_runtime_error():
super(GoodNotification, self).__init__(
notify_format=NotifyFormat.HTML, **kwargs)
def url(self):
def url(self, **kwargs):
# Support URL
return ''

View File

@ -69,7 +69,7 @@ def test_config_http(mock_post):
# Pretend everything is okay
return True
def url(self):
def url(self, **kwargs):
# Support url() function
return ''

View File

@ -28,7 +28,9 @@ import os
import sys
import mock
import pytest
import requests
from apprise import Apprise
from apprise import AppriseAsset
from apprise import URLBase
from apprise.logger import LogCapture
@ -620,3 +622,82 @@ def test_apprise_log_file_captures_py2(tmpdir):
# Disable Logging
logging.disable(logging.CRITICAL)
@pytest.mark.skipif(sys.version_info.major <= 2, reason="Requires Python 3.x+")
@mock.patch('requests.post')
def test_apprise_secure_logging(mock_post):
"""
API: Apprise() secure logging tests
"""
# Ensure we're not running in a disabled state
logging.disable(logging.NOTSET)
logger.setLevel(logging.CRITICAL)
# Prepare Mock
mock_post.return_value = requests.Request()
mock_post.return_value.status_code = requests.codes.ok
# Default Secure Logging is set to enabled
asset = AppriseAsset()
assert asset.secure_logging is True
# Load our asset
a = Apprise(asset=asset)
with LogCapture(level=logging.DEBUG) as stream:
# add a test server
assert a.add("json://user:pass1$-3!@localhost") is True
# Our servers should carry this flag
a[0].asset.secure_logging is True
logs = re.split(r'\r*\n', stream.getvalue().rstrip())
assert len(logs) == 1
entry = re.split(r'\s-\s', logs[0])
assert len(entry) == 3
assert entry[1] == 'DEBUG'
assert entry[2].startswith(
'Loaded JSON URL: json://user:****@localhost/')
# Send notification
assert a.notify("test") is True
# Test our call count
assert mock_post.call_count == 1
# Reset
mock_post.reset_mock()
# Now we test the reverse configuration and turn off
# secure logging.
# Default Secure Logging is set to disable
asset = AppriseAsset(secure_logging=False)
assert asset.secure_logging is False
# Load our asset
a = Apprise(asset=asset)
with LogCapture(level=logging.DEBUG) as stream:
# add a test server
assert a.add("json://user:pass1$-3!@localhost") is True
# Our servers should carry this flag
a[0].asset.secure_logging is False
logs = re.split(r'\r*\n', stream.getvalue().rstrip())
assert len(logs) == 1
entry = re.split(r'\s-\s', logs[0])
assert len(entry) == 3
assert entry[1] == 'DEBUG'
# Note that our password is no longer escaped (it is however
# url encoded)
assert entry[2].startswith(
'Loaded JSON URL: json://user:pass1%24-3%21@localhost/')
# Disable Logging
logging.disable(logging.CRITICAL)

View File

@ -554,6 +554,13 @@ def test_is_hostname():
assert utils.is_hostname('valid-underscores_in_host.ca') == \
'valid-underscores_in_host.ca'
# Underscores are supported by default
assert utils.is_hostname('valid_dashes_in_host.ca') == \
'valid_dashes_in_host.ca'
# However they are not if specified otherwise:
assert utils.is_hostname(
'valid_dashes_in_host.ca', underscore=False) is False
# Invalid Hostnames
assert utils.is_hostname('-hostname.that.starts.with.a.dash') is False
assert utils.is_hostname('invalid-characters_#^.ca') is False
@ -1493,3 +1500,65 @@ def test_apply_templating():
template, app_mode=utils.TemplateType.JSON,
**{'value': '"quotes are escaped"'})
assert result == '{value: "\\"quotes are escaped\\""}'
def test_cwe312_word():
"""utils: cwe312_word() testing
"""
assert utils.cwe312_word(None) is None
assert utils.cwe312_word(42) == 42
assert utils.cwe312_word('') == ''
assert utils.cwe312_word(' ') == ' '
assert utils.cwe312_word('!') == '!'
assert utils.cwe312_word('a') == 'a'
assert utils.cwe312_word('ab') == 'ab'
assert utils.cwe312_word('abc') == 'abc'
assert utils.cwe312_word('abcd') == 'abcd'
assert utils.cwe312_word('abcd', force=True) == 'a...d'
assert utils.cwe312_word('abc--d') == 'abc--d'
assert utils.cwe312_word('a-domain.ca') == 'a...a'
# Variances to still catch domain
assert utils.cwe312_word('a-domain.ca', advanced=False) == 'a-domain.ca'
assert utils.cwe312_word('a-domain.ca', threshold=6) == 'a-domain.ca'
def test_cwe312_url():
"""utils: cwe312_url() testing
"""
assert utils.cwe312_url(None) is None
assert utils.cwe312_url(42) == 42
assert utils.cwe312_url('http://') == 'http://'
assert utils.cwe312_url('discord://') == 'discord://'
assert utils.cwe312_url('path') == 'http://path'
assert utils.cwe312_url('path/') == 'http://path/'
# Now test http:// private data
assert utils.cwe312_url(
'http://user:pass123@localhost') == 'http://user:p...3@localhost'
assert utils.cwe312_url(
'http://user@localhost') == 'http://user@localhost'
assert utils.cwe312_url(
'http://user@localhost?password=abc123') == \
'http://user@localhost?password=a...3'
assert utils.cwe312_url(
'http://user@localhost?secret=secret-.12345') == \
'http://user@localhost?secret=s...5'
# Now test other:// private data
assert utils.cwe312_url(
'gitter://b5637831f563aa846bb5b2c27d8fe8f633b8f026/apprise') == \
'gitter://b...6/apprise'
assert utils.cwe312_url(
'gitter://b5637831f563aa846bb5b2c27d8fe8f633b8f026'
'/apprise/?pass=abc123') == \
'gitter://b...6/apprise?pass=a...3'
assert utils.cwe312_url(
'slack://mybot@xoxb-43598234231-3248932482278-BZK5Wj15B9mPh1RkShJoCZ44'
'/lead2gold@gmail.com') == 'slack://mybot@x...4/l...m'
assert utils.cwe312_url(
'slack://test@B4QP3WWB4/J3QWT41JM/XIl2ffpqXkzkwMXrJdevi7W3/'
'#random') == 'slack://test@B...4/J...M/X...3'