mirror of
https://github.com/caronc/apprise.git
synced 2024-11-23 08:33:21 +01:00
Do not sanitize http:// attachment URLs (#1122)
This commit is contained in:
parent
5fd912f5fe
commit
fa2d338568
@ -315,7 +315,7 @@ class AttachBase(URLBase):
|
|||||||
"download() is implimented by the child class.")
|
"download() is implimented by the child class.")
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def parse_url(url, verify_host=True, mimetype_db=None):
|
def parse_url(url, verify_host=True, mimetype_db=None, sanitize=True):
|
||||||
"""Parses the URL and returns it broken apart into a dictionary.
|
"""Parses the URL and returns it broken apart into a dictionary.
|
||||||
|
|
||||||
This is very specific and customized for Apprise.
|
This is very specific and customized for Apprise.
|
||||||
@ -333,7 +333,8 @@ class AttachBase(URLBase):
|
|||||||
successful, otherwise None is returned.
|
successful, otherwise None is returned.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
results = URLBase.parse_url(url, verify_host=verify_host)
|
results = URLBase.parse_url(
|
||||||
|
url, verify_host=verify_host, sanitize=sanitize)
|
||||||
|
|
||||||
if not results:
|
if not results:
|
||||||
# We're done; we failed to parse our url
|
# We're done; we failed to parse our url
|
||||||
|
@ -296,8 +296,7 @@ class AttachHTTP(AttachBase):
|
|||||||
"""
|
"""
|
||||||
Tidy memory if open
|
Tidy memory if open
|
||||||
"""
|
"""
|
||||||
with self._lock:
|
self.invalidate()
|
||||||
self.invalidate()
|
|
||||||
|
|
||||||
def url(self, privacy=False, *args, **kwargs):
|
def url(self, privacy=False, *args, **kwargs):
|
||||||
"""
|
"""
|
||||||
@ -363,8 +362,7 @@ class AttachHTTP(AttachBase):
|
|||||||
us to re-instantiate this object.
|
us to re-instantiate this object.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
results = AttachBase.parse_url(url)
|
results = AttachBase.parse_url(url, sanitize=False)
|
||||||
|
|
||||||
if not results:
|
if not results:
|
||||||
# We're done early as we couldn't load the results
|
# We're done early as we couldn't load the results
|
||||||
return results
|
return results
|
||||||
|
@ -744,7 +744,7 @@ class URLBase:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def parse_url(url, verify_host=True, plus_to_space=False,
|
def parse_url(url, verify_host=True, plus_to_space=False,
|
||||||
strict_port=False):
|
strict_port=False, sanitize=True):
|
||||||
"""Parses the URL and returns it broken apart into a dictionary.
|
"""Parses the URL and returns it broken apart into a dictionary.
|
||||||
|
|
||||||
This is very specific and customized for Apprise.
|
This is very specific and customized for Apprise.
|
||||||
@ -765,7 +765,8 @@ class URLBase:
|
|||||||
|
|
||||||
results = parse_url(
|
results = parse_url(
|
||||||
url, default_schema='unknown', verify_host=verify_host,
|
url, default_schema='unknown', verify_host=verify_host,
|
||||||
plus_to_space=plus_to_space, strict_port=strict_port)
|
plus_to_space=plus_to_space, strict_port=strict_port,
|
||||||
|
sanitize=sanitize)
|
||||||
|
|
||||||
if not results:
|
if not results:
|
||||||
# We're done; we failed to parse our url
|
# We're done; we failed to parse our url
|
||||||
|
@ -541,7 +541,7 @@ def tidy_path(path):
|
|||||||
return path
|
return path
|
||||||
|
|
||||||
|
|
||||||
def parse_qsd(qs, simple=False, plus_to_space=False):
|
def parse_qsd(qs, simple=False, plus_to_space=False, sanitize=True):
|
||||||
"""
|
"""
|
||||||
Query String Dictionary Builder
|
Query String Dictionary Builder
|
||||||
|
|
||||||
@ -568,6 +568,8 @@ def parse_qsd(qs, simple=False, plus_to_space=False):
|
|||||||
per normal URL Encoded defininition. Normal URL parsing applies
|
per normal URL Encoded defininition. Normal URL parsing applies
|
||||||
this, but `+` is very actively used character with passwords,
|
this, but `+` is very actively used character with passwords,
|
||||||
api keys, tokens, etc. So Apprise does not do this by default.
|
api keys, tokens, etc. So Apprise does not do this by default.
|
||||||
|
|
||||||
|
if sanitize is set to False, then kwargs are not placed into lowercase
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Our return result set:
|
# Our return result set:
|
||||||
@ -608,7 +610,7 @@ def parse_qsd(qs, simple=False, plus_to_space=False):
|
|||||||
|
|
||||||
# Always Query String Dictionary (qsd) for every entry we have
|
# Always Query String Dictionary (qsd) for every entry we have
|
||||||
# content is always made lowercase for easy indexing
|
# content is always made lowercase for easy indexing
|
||||||
result['qsd'][key.lower().strip()] = val
|
result['qsd'][key.lower().strip() if sanitize else key] = val
|
||||||
|
|
||||||
if simple:
|
if simple:
|
||||||
# move along
|
# move along
|
||||||
@ -636,7 +638,7 @@ def parse_qsd(qs, simple=False, plus_to_space=False):
|
|||||||
|
|
||||||
|
|
||||||
def parse_url(url, default_schema='http', verify_host=True, strict_port=False,
|
def parse_url(url, default_schema='http', verify_host=True, strict_port=False,
|
||||||
simple=False, plus_to_space=False):
|
simple=False, plus_to_space=False, sanitize=True):
|
||||||
"""A function that greatly simplifies the parsing of a url
|
"""A function that greatly simplifies the parsing of a url
|
||||||
specified by the end user.
|
specified by the end user.
|
||||||
|
|
||||||
@ -691,6 +693,8 @@ def parse_url(url, default_schema='http', verify_host=True, strict_port=False,
|
|||||||
|
|
||||||
If the URL can't be parsed then None is returned
|
If the URL can't be parsed then None is returned
|
||||||
|
|
||||||
|
If sanitize is set to False, then kwargs are not placed in lowercase
|
||||||
|
and wrapping whitespace is not removed
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if not isinstance(url, str):
|
if not isinstance(url, str):
|
||||||
@ -750,7 +754,8 @@ def parse_url(url, default_schema='http', verify_host=True, strict_port=False,
|
|||||||
# while ensuring that all keys are lowercase
|
# while ensuring that all keys are lowercase
|
||||||
if qsdata:
|
if qsdata:
|
||||||
result.update(parse_qsd(
|
result.update(parse_qsd(
|
||||||
qsdata, simple=simple, plus_to_space=plus_to_space))
|
qsdata, simple=simple, plus_to_space=plus_to_space,
|
||||||
|
sanitize=sanitize))
|
||||||
|
|
||||||
# Now do a proper extraction of data; http:// is just substitued in place
|
# Now do a proper extraction of data; http:// is just substitued in place
|
||||||
# to allow urlparse() to function as expected, we'll swap this back to the
|
# to allow urlparse() to function as expected, we'll swap this back to the
|
||||||
|
@ -719,6 +719,38 @@ def test_parse_url_general():
|
|||||||
assert result['qsd+'] == {}
|
assert result['qsd+'] == {}
|
||||||
assert result['qsd:'] == {}
|
assert result['qsd:'] == {}
|
||||||
|
|
||||||
|
# Sanitizing
|
||||||
|
result = utils.parse_url(
|
||||||
|
'hTTp://hostname/?+KeY=ValueA&-kEy=ValueB&KEY=Value%20+C&:cOlON=YeS',
|
||||||
|
sanitize=False)
|
||||||
|
|
||||||
|
assert len(result['qsd-']) == 1
|
||||||
|
assert len(result['qsd+']) == 1
|
||||||
|
assert len(result['qsd']) == 4
|
||||||
|
assert len(result['qsd:']) == 1
|
||||||
|
|
||||||
|
assert result['schema'] == 'http'
|
||||||
|
assert result['host'] == 'hostname'
|
||||||
|
assert result['port'] is None
|
||||||
|
assert result['user'] is None
|
||||||
|
assert result['password'] is None
|
||||||
|
assert result['fullpath'] == '/'
|
||||||
|
assert result['path'] == '/'
|
||||||
|
assert result['query'] is None
|
||||||
|
assert result['url'] == 'http://hostname/'
|
||||||
|
assert '+KeY' in result['qsd']
|
||||||
|
assert '-kEy' in result['qsd']
|
||||||
|
assert ':cOlON' in result['qsd']
|
||||||
|
assert result['qsd:']['cOlON'] == 'YeS'
|
||||||
|
assert 'key' not in result['qsd']
|
||||||
|
assert 'KeY' in result['qsd+']
|
||||||
|
assert result['qsd+']['KeY'] == 'ValueA'
|
||||||
|
assert 'kEy' in result['qsd-']
|
||||||
|
assert result['qsd-']['kEy'] == 'ValueB'
|
||||||
|
assert result['qsd']['KEY'] == 'Value +C'
|
||||||
|
assert result['qsd']['+KeY'] == result['qsd+']['KeY']
|
||||||
|
assert result['qsd']['-kEy'] == result['qsd-']['kEy']
|
||||||
|
|
||||||
|
|
||||||
def test_parse_url_simple():
|
def test_parse_url_simple():
|
||||||
"utils: parse_url() testing """
|
"utils: parse_url() testing """
|
||||||
|
@ -190,7 +190,7 @@ def test_attach_http(mock_get, mock_post):
|
|||||||
|
|
||||||
# Test custom url get parameters
|
# Test custom url get parameters
|
||||||
results = AttachHTTP.parse_url(
|
results = AttachHTTP.parse_url(
|
||||||
'http://user:pass@localhost/apprise.gif?dl=1&cache=300')
|
'http://user:pass@localhost/apprise.gif?DL=1&cache=300')
|
||||||
assert isinstance(results, dict)
|
assert isinstance(results, dict)
|
||||||
attachment = AttachHTTP(**results)
|
attachment = AttachHTTP(**results)
|
||||||
assert isinstance(attachment.url(), str) is True
|
assert isinstance(attachment.url(), str) is True
|
||||||
@ -200,12 +200,16 @@ def test_attach_http(mock_get, mock_post):
|
|||||||
assert attachment
|
assert attachment
|
||||||
assert mock_get.call_count == 1
|
assert mock_get.call_count == 1
|
||||||
assert 'params' in mock_get.call_args_list[0][1]
|
assert 'params' in mock_get.call_args_list[0][1]
|
||||||
assert 'dl' in mock_get.call_args_list[0][1]['params']
|
assert 'DL' in mock_get.call_args_list[0][1]['params']
|
||||||
|
|
||||||
# Verify that arguments that are reserved for apprise are not
|
# Verify that arguments that are reserved for apprise are not
|
||||||
# passed along
|
# passed along
|
||||||
assert 'cache' not in mock_get.call_args_list[0][1]['params']
|
assert 'cache' not in mock_get.call_args_list[0][1]['params']
|
||||||
|
|
||||||
|
with mock.patch('os.unlink', side_effect=OSError()):
|
||||||
|
# Test invalidation with exception thrown
|
||||||
|
attachment.invalidate()
|
||||||
|
|
||||||
results = AttachHTTP.parse_url(
|
results = AttachHTTP.parse_url(
|
||||||
'http://user:pass@localhost/apprise.gif?+key=value&cache=True')
|
'http://user:pass@localhost/apprise.gif?+key=value&cache=True')
|
||||||
assert isinstance(results, dict)
|
assert isinstance(results, dict)
|
||||||
|
Loading…
Reference in New Issue
Block a user