100% test coverage + handling of test warnings (#134)

* 100% test coverage again + handling of test warnings
* dropped unreferenced dependencies
This commit is contained in:
Chris Caron 2019-06-30 15:32:12 -04:00 committed by GitHub
parent 084c14fee9
commit f3d335b748
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 263 additions and 120 deletions

View File

@ -669,4 +669,21 @@ class NotifyEmby(NotifyBase):
"""
Deconstructor
"""
self.logout()
try:
self.logout()
except LookupError:
# Python v3.5 call to requests can sometimes throw the exception
# "/usr/lib64/python3.7/socket.py", line 748, in getaddrinfo
# LookupError: unknown encoding: idna
#
# This occurs every time when running unit-tests against Apprise:
# LANG=C.UTF-8 PYTHONPATH=$(pwd) py.test-3.7
#
# There has been an open issue on this since Jan 2017.
# - https://bugs.python.org/issue29288
#
# A ~similar~ issue can be identified here in the requests
# ticket system as unresolved and has provided work-arounds
# - https://github.com/kennethreitz/requests/issues/3578
pass

View File

@ -52,8 +52,6 @@
import requests
import re
from os.path import basename
from json import loads
from json import dumps
@ -227,56 +225,70 @@ class NotifyTelegram(NotifyBase):
if not path:
# No image to send
self.logger.debug(
'Telegram Image does not exist for %s' % (notify_type))
'Telegram image does not exist for %s' % (notify_type))
# No need to fail; we may have been configured this way through
# the apprise.AssetObject()
return True
# Configure file payload (for upload)
files = {
'photo': (basename(path), open(path), 'rb'),
}
payload = {
'chat_id': chat_id,
}
self.logger.debug(
'Telegram Image POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate))
try:
r = requests.post(
url,
files=files,
data=payload,
verify=self.verify_certificate,
)
with open(path, 'rb') as f:
# Configure file payload (for upload)
files = {
'photo': f,
}
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyTelegram.http_response_code_lookup(r.status_code)
payload = {
'chat_id': chat_id,
}
self.logger.warning(
'Failed to send Telegram Image: '
'{}{}error={}.'.format(
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Telegram image POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate))
self.logger.debug('Response Details:\r\n{}'.format(r.content))
try:
r = requests.post(
url,
files=files,
data=payload,
verify=self.verify_certificate,
)
return False
if r.status_code != requests.codes.ok:
# We had a problem
status_str = NotifyTelegram\
.http_response_code_lookup(r.status_code)
except requests.RequestException as e:
self.logger.warning(
'A connection error occured posting Telegram Image.')
self.logger.debug('Socket Exception: %s' % str(e))
return False
self.logger.warning(
'Failed to send Telegram image: '
'{}{}error={}.'.format(
status_str,
', ' if status_str else '',
r.status_code))
return True
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
return False
except requests.RequestException as e:
self.logger.warning(
'A connection error occured posting Telegram image.')
self.logger.debug('Socket Exception: %s' % str(e))
return False
return True
except (IOError, OSError):
# IOError is present for backwards compatibility with Python
# versions older then 3.3. >= 3.3 throw OSError now.
# Could not open and/or read the file; this is not a problem since
# we scan a lot of default paths.
self.logger.error(
'File can not be opened for read: {}'.format(path))
return False
def detect_bot_owner(self):
"""

View File

@ -42,12 +42,16 @@ except ImportError:
from urllib.parse import urlparse
# URL Indexing Table for returns via parse_url()
# The below accepts and scans for:
# - schema://
# - schema://path
# - schema://path?kwargs
#
VALID_URL_RE = re.compile(
r'^[\s]*(?P<schema>[^:\s]+):[/\\]*(?P<path>[^?]+)'
r'(\?(?P<kwargs>.+))?[\s]*$',
r'^[\s]*((?P<schema>[^:\s]+):[/\\]+)?((?P<path>[^?]+)'
r'(\?(?P<kwargs>.+))?)?[\s]*$',
)
VALID_HOST_RE = re.compile(r'^[\s]*(?P<path>[^?\s]+)(\?(?P<kwargs>.+))?')
VALID_QUERY_RE = re.compile(r'^(?P<path>.*[/\\])(?P<query>[^/\\]*)$')
VALID_QUERY_RE = re.compile(r'^(?P<path>.*[/\\])(?P<query>[^/\\]+)?$')
# delimiters used to separate values when content is passed in by string.
# This is useful when turning a string into a list
@ -251,6 +255,7 @@ def parse_url(url, default_schema='http', verify_host=True):
<schema>://<host>:<port>/<path>
<schema>://<host>/<path>
<schema>://<host>
<host>
Argument parsing is also supported:
<schema>://<user>@<host>:<port>/<path>?key1=val&key2=val2
@ -277,7 +282,7 @@ def parse_url(url, default_schema='http', verify_host=True):
# The port (if specified)
'port': None,
# The hostname
'host': None,
'host': '',
# The full path (query + path)
'fullpath': None,
# The path
@ -304,41 +309,30 @@ def parse_url(url, default_schema='http', verify_host=True):
qsdata = ''
match = VALID_URL_RE.search(url)
if match:
# Extract basic results
result['schema'] = match.group('schema').lower().strip()
host = match.group('path').strip()
try:
qsdata = match.group('kwargs').strip()
except AttributeError:
# No qsdata
pass
# Extract basic results (with schema present)
result['schema'] = match.group('schema').lower().strip() \
if match.group('schema') else default_schema
host = match.group('path').strip() \
if match.group('path') else ''
qsdata = match.group('kwargs').strip() \
if match.group('kwargs') else None
else:
match = VALID_HOST_RE.search(url)
if not match:
return None
result['schema'] = default_schema
host = match.group('path').strip()
try:
qsdata = match.group('kwargs').strip()
except AttributeError:
# No qsdata
pass
# Could not extract basic content from the URL
return None
# Parse Query Arugments ?val=key&key=val
# while ensuring that all keys are lowercase
if qsdata:
result.update(parse_qsd(qsdata))
# Now do a proper extraction of data
# Now do a proper extraction of data; http:// is just substitued in place
# to allow urlparse() to function as expected, we'll swap this back to the
# expected schema after.
parsed = urlparse('http://%s' % host)
# Parse results
result['host'] = parsed[1].strip()
if not result['host']:
# Nothing more we can do without a hostname
return None
result['fullpath'] = quote(unquote(tidy_path(parsed[2].strip())))
try:
@ -359,11 +353,10 @@ def parse_url(url, default_schema='http', verify_host=True):
else:
# Using full path, extract query from path
match = VALID_QUERY_RE.search(result['fullpath'])
if match:
result['path'] = match.group('path')
result['query'] = match.group('query')
if not result['query']:
result['query'] = None
result['path'] = match.group('path')
result['query'] = match.group('query')
if not result['query']:
result['query'] = None
try:
(result['user'], result['host']) = \
re.split(r'[@]+', result['host'])[:2]

View File

@ -77,7 +77,6 @@ BuildRequires: python2-devel
BuildRequires: python-decorator
BuildRequires: python-requests
BuildRequires: python2-requests-oauthlib
BuildRequires: python2-oauthlib
BuildRequires: python-six
BuildRequires: python2-click >= 5.0
BuildRequires: python-markdown
@ -92,7 +91,6 @@ BuildRequires: python2-yaml
Requires: python-decorator
Requires: python-requests
Requires: python2-requests-oauthlib
Requires: python2-oauthlib
Requires: python-six
Requires: python-markdown
%if 0%{?rhel} && 0%{?rhel} <= 7

View File

@ -1,8 +1,6 @@
decorator
requests
requests-oauthlib
oauthlib
urllib3
six
click >= 5.0
markdown

View File

@ -1074,12 +1074,21 @@ def test_apprise_details_plugin_verification():
# 2 entries (name, and alias_of only!)
assert len(entry['details'][section][key]) == 1
# inspect our object
spec = inspect.getargspec(SCHEMA_MAP[protocols[0]].__init__)
if six.PY2:
# inspect our object
# getargspec() is depricated in Python v3
spec = inspect.getargspec(SCHEMA_MAP[protocols[0]].__init__)
function_args = \
(set(parse_list(spec.keywords)) - set(['kwargs'])) \
| (set(spec.args) - set(['self'])) | valid_kwargs
function_args = \
(set(parse_list(spec.keywords)) - set(['kwargs'])) \
| (set(spec.args) - set(['self'])) | valid_kwargs
else:
# Python v3+ uses getfullargspec()
spec = inspect.getfullargspec(SCHEMA_MAP[protocols[0]].__init__)
function_args = \
(set(parse_list(spec.varkw)) - set(['kwargs'])) \
| (set(spec.args) - set(['self'])) | valid_kwargs
# Iterate over our map_to_entries and make sure that everything
# maps to a function argument

View File

@ -22,7 +22,6 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import print_function
import mock
from apprise import cli

View File

@ -65,7 +65,8 @@ TEST_URLS = (
# NotifyBoxcar
##################################
('boxcar://', {
'instance': None,
# invalid secret key
'instance': TypeError,
}),
# A a bad url
('boxcar://:@/', {
@ -131,8 +132,8 @@ TEST_URLS = (
# NotifyD7Networks
##################################
('d7sms://', {
# No token specified
'instance': None,
# No target numbers
'instance': TypeError,
}),
('d7sms://:@/', {
# invalid user/pass
@ -1108,7 +1109,8 @@ TEST_URLS = (
# NotifyMSTeams
##################################
('msteams://', {
'instance': None,
# First API Token not specified
'instance': TypeError,
}),
('msteams://:@/', {
# We don't have strict host checking on for msteams, so this URL
@ -2092,8 +2094,8 @@ TEST_URLS = (
# NotifyTwilio
##################################
('twilio://', {
# No token specified
'instance': None,
# No Account SID specified
'instance': TypeError,
}),
('twilio://:@/', {
# invalid Auth token
@ -2173,7 +2175,8 @@ TEST_URLS = (
# NotifyTwitter
##################################
('twitter://', {
'instance': None,
# Missing Consumer API Key
'instance': TypeError,
}),
('twitter://:@/', {
'instance': TypeError,
@ -2319,8 +2322,8 @@ TEST_URLS = (
# NotifyNexmo
##################################
('nexmo://', {
# No secret and or key specified
'instance': None,
# No API Key specified
'instance': TypeError,
}),
('nexmo://:@/', {
# invalid Auth key
@ -2392,7 +2395,8 @@ TEST_URLS = (
# NotifyWebexTeams
##################################
('wxteams://', {
'instance': None,
# Teams Token missing
'instance': TypeError,
}),
('wxteams://:@/', {
# We don't have strict host checking on for wxteams, so this URL
@ -2747,6 +2751,11 @@ def test_rest_plugins(mock_post, mock_get):
# We loaded okay; now lets make sure we can reverse this url
assert isinstance(obj.url(), six.string_types) is True
# Some Simple Invalid Instance Testing
assert instance.parse_url(None) is None
assert instance.parse_url(object) is None
assert instance.parse_url(42) is None
# Instantiate the exact same object again using the URL from
# the one that was already created properly
obj_cmp = Apprise.instantiate(obj.url())
@ -2761,6 +2770,9 @@ def test_rest_plugins(mock_post, mock_get):
url, obj.url()))
assert False
# Tidy our object
del obj_cmp
if self:
# Iterate over our expected entries inside of our object
for key, val in self.items():
@ -2867,6 +2879,10 @@ def test_rest_plugins(mock_post, mock_get):
if not isinstance(e, response):
raise
# Tidy our object and allow any possible defined deconstructors to
# be executed.
del obj
except AssertionError:
# Don't mess with these entries
print('%s AssertionError' % url)
@ -3430,8 +3446,22 @@ def test_notify_emby_plugin_logout(mock_post, mock_get, mock_login):
# Disable the port completely
obj.port = None
# Perform logout
obj.logout()
# Calling logout on an object already logged out
obj.logout()
# Test Python v3.5 LookupError Bug: https://bugs.python.org/issue29288
mock_post.side_effect = LookupError()
mock_get.side_effect = LookupError()
obj.access_token = 'abc'
obj.user_id = '123'
# Tidy object
del obj
@mock.patch('apprise.plugins.NotifyEmby.sessions')
@mock.patch('apprise.plugins.NotifyEmby.login')
@ -3509,6 +3539,9 @@ def test_notify_emby_plugin_notify(mock_post, mock_get, mock_logout,
mock_sessions.return_value = {}
assert obj.notify('title', 'body', 'info') is True
# Tidy our object
del obj
@mock.patch('requests.get')
@mock.patch('requests.post')
@ -3800,11 +3833,6 @@ def test_notify_pushbullet_plugin(mock_post, mock_get):
# recipient here
assert len(obj.targets) == 1
# Support the handling of an empty and invalid URL strings
assert plugins.NotifyPushBullet.parse_url(None) is None
assert plugins.NotifyPushBullet.parse_url('') is None
assert plugins.NotifyPushBullet.parse_url(42) is None
@mock.patch('requests.get')
@mock.patch('requests.post')
@ -3865,11 +3893,6 @@ def test_notify_pushed_plugin(mock_post, mock_get):
assert len(obj.channels) == 2
assert len(obj.users) == 2
# Support the handling of an empty and invalid URL strings
assert plugins.NotifyPushed.parse_url(None) is None
assert plugins.NotifyPushed.parse_url('') is None
assert plugins.NotifyPushed.parse_url(42) is None
# Prepare Mock to fail
mock_post.return_value.status_code = requests.codes.internal_server_error
mock_get.return_value.status_code = requests.codes.internal_server_error
@ -3933,11 +3956,6 @@ def test_notify_pushover_plugin(mock_post, mock_get):
# device defined here
assert len(obj.targets) == 1
# Support the handling of an empty and invalid URL strings
assert plugins.NotifyPushover.parse_url(None) is None
assert plugins.NotifyPushover.parse_url('') is None
assert plugins.NotifyPushover.parse_url(42) is None
@mock.patch('requests.get')
@mock.patch('requests.post')
@ -3986,11 +4004,6 @@ def test_notify_rocketchat_plugin(mock_post, mock_get):
#
assert obj.logout() is True
# Support the handling of an empty and invalid URL strings
assert plugins.NotifyRocketChat.parse_url(None) is None
assert plugins.NotifyRocketChat.parse_url('') is None
assert plugins.NotifyRocketChat.parse_url(42) is None
# Prepare Mock to fail
mock_post.return_value.status_code = requests.codes.internal_server_error
mock_get.return_value.status_code = requests.codes.internal_server_error
@ -4089,21 +4102,28 @@ def test_notify_telegram_plugin(mock_post, mock_get):
# specified
assert True
obj = plugins.NotifyTelegram(bot_token=bot_token, targets=chat_ids)
obj = plugins.NotifyTelegram(
bot_token=bot_token, targets=chat_ids, include_image=True)
assert isinstance(obj, plugins.NotifyTelegram) is True
assert len(obj.targets) == 2
# Test Image Sending Exceptions
mock_get.side_effect = IOError()
mock_post.side_effect = IOError()
obj.send_image(obj.targets[0], NotifyType.INFO)
# Restore their entries
mock_get.side_effect = None
mock_post.side_effect = None
mock_get.return_value.content = '{}'
mock_post.return_value.content = '{}'
# test url call
assert isinstance(obj.url(), six.string_types) is True
# Test that we can load the string we generate back:
obj = plugins.NotifyTelegram(**plugins.NotifyTelegram.parse_url(obj.url()))
assert isinstance(obj, plugins.NotifyTelegram) is True
# Support the handling of an empty and invalid URL strings
assert plugins.NotifyTelegram.parse_url(None) is None
assert plugins.NotifyTelegram.parse_url('') is None
assert plugins.NotifyTelegram.parse_url(42) is None
# Prepare Mock to fail
response = mock.Mock()
response.status_code = requests.codes.internal_server_error

View File

@ -333,6 +333,103 @@ def test_parse_url():
assert(result['qsd-'] == {})
assert(result['qsd+'] == {})
# Test some illegal strings
result = utils.parse_url(object, verify_host=False)
assert result is None
result = utils.parse_url(None, verify_host=False)
assert result is None
# Just a schema; invalid host
result = utils.parse_url('test://')
assert result is None
# Do it again without host validation
result = utils.parse_url('test://', verify_host=False)
assert(result['schema'] == 'test')
# It's worth noting that the hostname is an empty string and is NEVER set
# to None if it wasn't specified.
assert result['host'] == ''
assert result['port'] is None
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None
assert result['path'] is None
assert result['query'] is None
assert result['url'] == 'test://'
assert result['qsd'] == {}
assert result['qsd-'] == {}
assert result['qsd+'] == {}
result = utils.parse_url('testhostname')
assert result['schema'] == 'http'
assert result['host'] == 'testhostname'
assert result['port'] is None
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None
assert result['path'] is None
assert result['query'] is None
# The default_schema kicks in here
assert result['url'] == 'http://testhostname'
assert result['qsd'] == {}
assert result['qsd-'] == {}
result = utils.parse_url('example.com', default_schema='unknown')
assert result['schema'] == 'unknown'
assert result['host'] == 'example.com'
assert result['port'] is None
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None
assert result['path'] is None
assert result['query'] is None
# The default_schema kicks in here
assert result['url'] == 'unknown://example.com'
assert result['qsd'] == {}
assert result['qsd-'] == {}
# An empty string without a hostame is still valid if verify_host is set
result = utils.parse_url('', verify_host=False)
assert result['schema'] == 'http'
assert result['host'] == ''
assert result['port'] is None
assert result['user'] is None
assert result['password'] is None
assert result['fullpath'] is None
assert result['path'] is None
assert result['query'] is None
# The default_schema kicks in here
assert result['url'] == 'http://'
assert result['qsd'] == {}
assert result['qsd-'] == {}
# A messed up URL
result = utils.parse_url('test://:@/', verify_host=False)
assert result['schema'] == 'test'
assert result['host'] == ''
assert result['port'] is None
assert result['user'] == ''
assert result['password'] == ''
assert result['fullpath'] == '/'
assert result['path'] == '/'
assert result['query'] is None
assert result['url'] == 'test://:@/'
assert result['qsd'] == {}
assert result['qsd-'] == {}
result = utils.parse_url('crazy://:@//_/@^&/jack.json', verify_host=False)
assert result['schema'] == 'crazy'
assert result['host'] == ''
assert result['port'] is None
assert result['user'] == ''
assert result['password'] == ''
assert(unquote(result['fullpath']) == '/_/@^&/jack.json')
assert(unquote(result['path']) == '/_/@^&/')
assert result['query'] == 'jack.json'
assert(unquote(result['url']) == 'crazy://:@/_/@^&/jack.json')
assert result['qsd'] == {}
assert result['qsd-'] == {}
def test_parse_bool():
"utils: parse_bool() testing """