Fullpath bulletproofing on XML, JSON, and FORM plugins (#519)

This commit is contained in:
Chris Caron 2022-01-29 14:23:44 -05:00 committed by GitHub
parent cbbf8c3cf5
commit 8fa146685f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 289 additions and 16 deletions

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# Copyright (C) 2022 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
@ -138,7 +138,7 @@ class NotifyForm(NotifyBase):
self.fullpath = kwargs.get('fullpath')
if not isinstance(self.fullpath, six.string_types):
self.fullpath = '/'
self.fullpath = ''
self.method = self.template_args['method']['default'] \
if not isinstance(method, six.string_types) else method.upper()
@ -195,14 +195,15 @@ class NotifyForm(NotifyBase):
default_port = 443 if self.secure else 80
return '{schema}://{auth}{hostname}{port}{fullpath}/?{params}'.format(
return '{schema}://{auth}{hostname}{port}{fullpath}?{params}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
# never encode hostname since we're expecting it to be a valid one
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
fullpath=NotifyForm.quote(self.fullpath, safe='/'),
fullpath=NotifyForm.quote(self.fullpath, safe='/')
if self.fullpath else '/',
params=NotifyForm.urlencode(params),
)
@ -279,8 +280,8 @@ class NotifyForm(NotifyBase):
url += self.fullpath
self.logger.debug('Form POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
self.logger.debug('Form %s URL: %s (cert_verify=%r)' % (
self.method, url, self.verify_certificate,
))
self.logger.debug('Form Payload: %s' % str(payload))
@ -306,13 +307,14 @@ class NotifyForm(NotifyBase):
r = method(
url,
files=None if not files else files,
data=payload,
data=payload if self.method != 'GET' else None,
params=payload if self.method == 'GET' else None,
headers=headers,
auth=auth,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
if r.status_code != requests.codes.ok:
if r.status_code < 200 or r.status_code >= 300:
# We had a problem
status_str = \
NotifyForm.http_response_code_lookup(r.status_code)

View File

@ -136,7 +136,7 @@ class NotifyJSON(NotifyBase):
self.fullpath = kwargs.get('fullpath')
if not isinstance(self.fullpath, six.string_types):
self.fullpath = '/'
self.fullpath = ''
self.method = self.template_args['method']['default'] \
if not isinstance(method, six.string_types) else method.upper()
@ -184,14 +184,15 @@ class NotifyJSON(NotifyBase):
default_port = 443 if self.secure else 80
return '{schema}://{auth}{hostname}{port}{fullpath}/?{params}'.format(
return '{schema}://{auth}{hostname}{port}{fullpath}?{params}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
# never encode hostname since we're expecting it to be a valid one
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
fullpath=NotifyJSON.quote(self.fullpath, safe='/'),
fullpath=NotifyJSON.quote(self.fullpath, safe='/')
if self.fullpath else '/',
params=NotifyJSON.urlencode(params),
)
@ -296,7 +297,7 @@ class NotifyJSON(NotifyBase):
verify=self.verify_certificate,
timeout=self.request_timeout,
)
if r.status_code != requests.codes.ok:
if r.status_code < 200 or r.status_code >= 300:
# We had a problem
status_str = \
NotifyJSON.http_response_code_lookup(r.status_code)

View File

@ -157,7 +157,7 @@ class NotifyXML(NotifyBase):
self.fullpath = kwargs.get('fullpath')
if not isinstance(self.fullpath, six.string_types):
self.fullpath = '/'
self.fullpath = ''
self.method = self.template_args['method']['default'] \
if not isinstance(method, six.string_types) else method.upper()
@ -205,14 +205,15 @@ class NotifyXML(NotifyBase):
default_port = 443 if self.secure else 80
return '{schema}://{auth}{hostname}{port}{fullpath}/?{params}'.format(
return '{schema}://{auth}{hostname}{port}{fullpath}?{params}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
# never encode hostname since we're expecting it to be a valid one
hostname=self.host,
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
fullpath=NotifyXML.quote(self.fullpath, safe='/'),
fullpath=NotifyXML.quote(self.fullpath, safe='/')
if self.fullpath else '/',
params=NotifyXML.urlencode(params),
)
@ -335,7 +336,7 @@ class NotifyXML(NotifyBase):
verify=self.verify_certificate,
timeout=self.request_timeout,
)
if r.status_code != requests.codes.ok:
if r.status_code < 200 or r.status_code >= 300:
# We had a problem
status_str = \
NotifyXML.http_response_code_lookup(r.status_code)

View File

@ -224,3 +224,155 @@ def test_plugin_custom_form_attachments(mock_post):
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is False
@mock.patch('requests.post')
@mock.patch('requests.get')
def test_plugin_custom_form_edge_cases(mock_get, mock_post):
"""
NotifyForm() Edge Cases
"""
# Disable Throttling to speed testing
plugins.NotifyBase.request_rate_per_sec = 0
# Prepare our response
response = requests.Request()
response.status_code = requests.codes.ok
# Prepare Mock
mock_post.return_value = response
mock_get.return_value = response
results = plugins.NotifyForm.parse_url(
'form://localhost:8080/command?:abcd=test&method=POST')
assert isinstance(results, dict)
assert results['user'] is None
assert results['password'] is None
assert results['port'] == 8080
assert results['host'] == 'localhost'
assert results['fullpath'] == '/command'
assert results['path'] == '/'
assert results['query'] == 'command'
assert results['schema'] == 'form'
assert results['url'] == 'form://localhost:8080/command'
assert isinstance(results['qsd:'], dict) is True
assert results['qsd:']['abcd'] == 'test'
instance = plugins.NotifyForm(**results)
assert isinstance(instance, plugins.NotifyForm)
response = instance.send(title='title', body='body')
assert response is True
assert mock_post.call_count == 1
assert mock_get.call_count == 0
details = mock_post.call_args_list[0]
assert details[0][0] == 'http://localhost:8080/command'
assert 'abcd' in details[1]['data']
assert details[1]['data']['abcd'] == 'test'
assert 'title' in details[1]['data']
assert details[1]['data']['title'] == 'title'
assert 'message' in details[1]['data']
assert details[1]['data']['message'] == 'body'
assert instance.url(privacy=False).startswith(
'form://localhost:8080/command?')
# Generate a new URL based on our last and verify key values are the same
new_results = plugins.NotifyForm.parse_url(instance.url(safe=False))
for k in ('user', 'password', 'port', 'host', 'fullpath', 'path', 'query',
'schema', 'url', 'payload', 'method'):
assert new_results[k] == results[k]
# Reset our mock configuration
mock_post.reset_mock()
mock_get.reset_mock()
results = plugins.NotifyForm.parse_url(
'form://localhost:8080/command?:message=test&method=POST')
assert isinstance(results, dict)
assert results['user'] is None
assert results['password'] is None
assert results['port'] == 8080
assert results['host'] == 'localhost'
assert results['fullpath'] == '/command'
assert results['path'] == '/'
assert results['query'] == 'command'
assert results['schema'] == 'form'
assert results['url'] == 'form://localhost:8080/command'
assert isinstance(results['qsd:'], dict) is True
assert results['qsd:']['message'] == 'test'
instance = plugins.NotifyForm(**results)
assert isinstance(instance, plugins.NotifyForm)
response = instance.send(title='title', body='body')
assert response is True
assert mock_post.call_count == 1
assert mock_get.call_count == 0
details = mock_post.call_args_list[0]
assert details[0][0] == 'http://localhost:8080/command'
assert 'title' in details[1]['data']
assert details[1]['data']['title'] == 'title'
# 'body' is over-ridden by 'test' passed inline with the URL
assert 'message' in details[1]['data']
assert details[1]['data']['message'] == 'test'
assert instance.url(privacy=False).startswith(
'form://localhost:8080/command?')
# Generate a new URL based on our last and verify key values are the same
new_results = plugins.NotifyForm.parse_url(instance.url(safe=False))
for k in ('user', 'password', 'port', 'host', 'fullpath', 'path', 'query',
'schema', 'url', 'payload', 'method'):
assert new_results[k] == results[k]
# Reset our mock configuration
mock_post.reset_mock()
mock_get.reset_mock()
results = plugins.NotifyForm.parse_url(
'form://localhost:8080/command?:message=test&method=GET')
assert isinstance(results, dict)
assert results['user'] is None
assert results['password'] is None
assert results['port'] == 8080
assert results['host'] == 'localhost'
assert results['fullpath'] == '/command'
assert results['path'] == '/'
assert results['query'] == 'command'
assert results['schema'] == 'form'
assert results['url'] == 'form://localhost:8080/command'
assert isinstance(results['qsd:'], dict) is True
assert results['qsd:']['message'] == 'test'
instance = plugins.NotifyForm(**results)
assert isinstance(instance, plugins.NotifyForm)
response = instance.send(title='title', body='body')
assert response is True
assert mock_post.call_count == 0
assert mock_get.call_count == 1
details = mock_get.call_args_list[0]
assert details[0][0] == 'http://localhost:8080/command'
assert 'title' in details[1]['params']
assert details[1]['params']['title'] == 'title'
# 'body' is over-ridden by 'test' passed inline with the URL
assert 'message' in details[1]['params']
assert details[1]['params']['message'] == 'test'
assert instance.url(privacy=False).startswith(
'form://localhost:8080/command?')
# Generate a new URL based on our last and verify key values are the same
new_results = plugins.NotifyForm.parse_url(instance.url(safe=False))
for k in ('user', 'password', 'port', 'host', 'fullpath', 'path', 'query',
'schema', 'url', 'payload', 'method'):
assert new_results[k] == results[k]

View File

@ -22,6 +22,8 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import json
import mock
import requests
from apprise import plugins
from helpers import AppriseURLTester
@ -130,3 +132,63 @@ def test_plugin_custom_json_urls():
# Run our general tests
AppriseURLTester(tests=apprise_url_tests).run_all()
@mock.patch('requests.post')
@mock.patch('requests.get')
def test_plugin_custom_json_edge_cases(mock_get, mock_post):
"""
NotifyJSON() Edge Cases
"""
# Disable Throttling to speed testing
plugins.NotifyBase.request_rate_per_sec = 0
# Prepare our response
response = requests.Request()
response.status_code = requests.codes.ok
# Prepare Mock
mock_post.return_value = response
mock_get.return_value = response
results = plugins.NotifyJSON.parse_url(
'json://localhost:8080/command?:message=test&method=GET')
assert isinstance(results, dict)
assert results['user'] is None
assert results['password'] is None
assert results['port'] == 8080
assert results['host'] == 'localhost'
assert results['fullpath'] == '/command'
assert results['path'] == '/'
assert results['query'] == 'command'
assert results['schema'] == 'json'
assert results['url'] == 'json://localhost:8080/command'
assert isinstance(results['qsd:'], dict) is True
assert results['qsd:']['message'] == 'test'
instance = plugins.NotifyJSON(**results)
assert isinstance(instance, plugins.NotifyJSON)
response = instance.send(title='title', body='body')
assert response is True
assert mock_post.call_count == 0
assert mock_get.call_count == 1
details = mock_get.call_args_list[0]
assert details[0][0] == 'http://localhost:8080/command'
assert 'title' in details[1]['data']
dataset = json.loads(details[1]['data'])
assert dataset['title'] == 'title'
assert 'message' in dataset
assert dataset['message'] == 'body'
assert instance.url(privacy=False).startswith(
'json://localhost:8080/command?')
# Generate a new URL based on our last and verify key values are the same
new_results = plugins.NotifyJSON.parse_url(instance.url(safe=False))
for k in ('user', 'password', 'port', 'host', 'fullpath', 'path', 'query',
'schema', 'url', 'method'):
assert new_results[k] == results[k]

View File

@ -22,6 +22,7 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import mock
import requests
from apprise import plugins
from helpers import AppriseURLTester
@ -146,3 +147,57 @@ def test_plugin_custom_xml_urls():
# Run our general tests
AppriseURLTester(tests=apprise_url_tests).run_all()
@mock.patch('requests.post')
@mock.patch('requests.get')
def test_plugin_custom_xml_edge_cases(mock_get, mock_post):
"""
NotifyXML() Edge Cases
"""
# Disable Throttling to speed testing
plugins.NotifyBase.request_rate_per_sec = 0
# Prepare our response
response = requests.Request()
response.status_code = requests.codes.ok
# Prepare Mock
mock_post.return_value = response
mock_get.return_value = response
results = plugins.NotifyXML.parse_url(
'xml://localhost:8080/command?:message=test&method=GET')
assert isinstance(results, dict)
assert results['user'] is None
assert results['password'] is None
assert results['port'] == 8080
assert results['host'] == 'localhost'
assert results['fullpath'] == '/command'
assert results['path'] == '/'
assert results['query'] == 'command'
assert results['schema'] == 'xml'
assert results['url'] == 'xml://localhost:8080/command'
assert isinstance(results['qsd:'], dict) is True
assert results['qsd:']['message'] == 'test'
instance = plugins.NotifyXML(**results)
assert isinstance(instance, plugins.NotifyXML)
response = instance.send(title='title', body='body')
assert response is True
assert mock_post.call_count == 0
assert mock_get.call_count == 1
details = mock_get.call_args_list[0]
assert details[0][0] == 'http://localhost:8080/command'
assert instance.url(privacy=False).startswith(
'xml://localhost:8080/command?')
# Generate a new URL based on our last and verify key values are the same
new_results = plugins.NotifyXML.parse_url(instance.url(safe=False))
for k in ('user', 'password', 'port', 'host', 'fullpath', 'path', 'query',
'schema', 'url', 'method'):
assert new_results[k] == results[k]