Refactored configuration + cache support added (#175)

This commit is contained in:
Chris Caron 2019-11-17 15:51:40 -05:00 committed by GitHub
parent e5c0334b90
commit c8a5d3b0c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 409 additions and 141 deletions

View File

@ -1,6 +1,8 @@
## Description:
**Related issue (if applicable):** fixes #<!--apprise issue number goes here-->
<!-- Have anything else to describe? Define it here -->
## New Service Completion Status
<!-- This section is only applicable if you're adding a new service -->
* [ ] apprise/plugins/Notify<!--ServiceName goes here-->.py

View File

@ -56,8 +56,19 @@ class AppriseConfig(object):
If no path is specified then a default list is used.
If cache is set to True, then after the data is loaded, it's cached
within this object so it isn't retrieved again later.
By default we cache our responses so that subsiquent calls does not
cause the content to be retrieved again. Setting this to False does
mean more then one call can be made to retrieve the (same) data. This
method can be somewhat inefficient if disabled and you're set up to
make remote calls. Only disable caching if you understand the
consequences.
You can alternatively set the cache value to an int identifying the
number of seconds the previously retrieved can exist for before it
should be considered expired.
It's also worth nothing that the cache value is only set to elements
that are not already of subclass ConfigBase()
"""
# Initialize a server list of URLs
@ -67,24 +78,43 @@ class AppriseConfig(object):
self.asset = \
asset if isinstance(asset, AppriseAsset) else AppriseAsset()
# Set our cache flag
self.cache = cache
if paths is not None:
# Store our path(s)
self.add(paths)
return
def add(self, configs, asset=None, tag=None):
def add(self, configs, asset=None, tag=None, cache=True):
"""
Adds one or more config URLs into our list.
You can override the global asset if you wish by including it with the
config(s) that you add.
By default we cache our responses so that subsiquent calls does not
cause the content to be retrieved again. Setting this to False does
mean more then one call can be made to retrieve the (same) data. This
method can be somewhat inefficient if disabled and you're set up to
make remote calls. Only disable caching if you understand the
consequences.
You can alternatively set the cache value to an int identifying the
number of seconds the previously retrieved can exist for before it
should be considered expired.
It's also worth nothing that the cache value is only set to elements
that are not already of subclass ConfigBase()
"""
# Initialize our return status
return_status = True
# Initialize our default cache value
cache = cache if cache is not None else self.cache
if isinstance(asset, AppriseAsset):
# prepare default asset
asset = self.asset
@ -104,7 +134,7 @@ class AppriseConfig(object):
'specified.'.format(type(configs)))
return False
# Iterate over our
# Iterate over our configuration
for _config in configs:
if isinstance(_config, ConfigBase):
@ -123,7 +153,8 @@ class AppriseConfig(object):
# Instantiate ourselves an object, this function throws or
# returns None if it fails
instance = AppriseConfig.instantiate(_config, asset=asset, tag=tag)
instance = AppriseConfig.instantiate(
_config, asset=asset, tag=tag, cache=cache)
if not isinstance(instance, ConfigBase):
return_status = False
continue
@ -134,7 +165,7 @@ class AppriseConfig(object):
# Return our status
return return_status
def servers(self, tag=MATCH_ALL_TAG, cache=True):
def servers(self, tag=MATCH_ALL_TAG, *args, **kwargs):
"""
Returns all of our servers dynamically build based on parsed
configuration.
@ -165,15 +196,16 @@ class AppriseConfig(object):
logic=tag, data=entry.tags, match_all=MATCH_ALL_TAG):
# Build ourselves a list of services dynamically and return the
# as a list
response.extend(entry.servers(cache=cache))
response.extend(entry.servers())
return response
@staticmethod
def instantiate(url, asset=None, tag=None, suppress_exceptions=True):
def instantiate(url, asset=None, tag=None, cache=None,
suppress_exceptions=True):
"""
Returns the instance of a instantiated configuration plugin based on
the provided Server URL. If the url fails to be parsed, then None
the provided Config URL. If the url fails to be parsed, then None
is returned.
"""
@ -210,6 +242,10 @@ class AppriseConfig(object):
results['asset'] = \
asset if isinstance(asset, AppriseAsset) else AppriseAsset()
if cache is not None:
# Force an over-ride of the cache value to what we have specified
results['cache'] = cache
if suppress_exceptions:
try:
# Attempt to create an instance of our plugin using the parsed
@ -261,10 +297,11 @@ class AppriseConfig(object):
# If we reach here, then we indexed out of range
raise IndexError('list index out of range')
def pop(self, index):
def pop(self, index=-1):
"""
Removes an indexed Apprise Configuration from the stack and
returns it.
Removes an indexed Apprise Configuration from the stack and returns it.
By default, the last element is removed from the list
"""
# Remove our entry
return self.configs.pop(index)

View File

@ -109,12 +109,14 @@ class AttachBase(URLBase):
# Absolute path to attachment
self.download_path = None
# Set our cache flag
# it can be True, or an integer
# Set our cache flag; it can be True or a (positive) integer
try:
self.cache = cache if isinstance(cache, bool) else int(cache)
if self.cache < 0:
raise ValueError()
err = 'A negative cache value ({}) was specified.'.format(
cache)
self.logger.warning(err)
raise TypeError(err)
except (ValueError, TypeError):
err = 'An invalid cache value ({}) was specified.'.format(cache)
@ -212,7 +214,8 @@ class AttachBase(URLBase):
if self.download_path and os.path.isfile(self.download_path) \
and self.cache:
# We have enough reason to look further into our cached value
# We have enough reason to look further into our cached content
# and verify it has not expired.
if self.cache is True:
# return our fixed content as is; we will always cache it
return True

View File

@ -27,6 +27,7 @@ import os
import re
import six
import yaml
import time
from .. import plugins
from ..AppriseAsset import AppriseAsset
@ -35,6 +36,7 @@ from ..common import ConfigFormat
from ..common import CONFIG_FORMATS
from ..utils import GET_SCHEMA_RE
from ..utils import parse_list
from ..utils import parse_bool
class ConfigBase(URLBase):
@ -58,16 +60,31 @@ class ConfigBase(URLBase):
# anything else. 128KB (131072B)
max_buffer_size = 131072
def __init__(self, **kwargs):
def __init__(self, cache=True, **kwargs):
"""
Initialize some general logging and common server arguments that will
keep things consistent when working with the configurations that
inherit this class.
By default we cache our responses so that subsiquent calls does not
cause the content to be retrieved again. For local file references
this makes no difference at all. But for remote content, this does
mean more then one call can be made to retrieve the (same) data. This
method can be somewhat inefficient if disabled. Only disable caching
if you understand the consequences.
You can alternatively set the cache value to an int identifying the
number of seconds the previously retrieved can exist for before it
should be considered expired.
"""
super(ConfigBase, self).__init__(**kwargs)
# Tracks the time the content was last retrieved on. This place a role
# for cases where we are not caching our response and are required to
# re-retrieve our settings.
self._cached_time = None
# Tracks previously loaded content for speed
self._cached_servers = None
@ -86,20 +103,34 @@ class ConfigBase(URLBase):
self.logger.warning(err)
raise TypeError(err)
# Set our cache flag; it can be True or a (positive) integer
try:
self.cache = cache if isinstance(cache, bool) else int(cache)
if self.cache < 0:
err = 'A negative cache value ({}) was specified.'.format(
cache)
self.logger.warning(err)
raise TypeError(err)
except (ValueError, TypeError):
err = 'An invalid cache value ({}) was specified.'.format(cache)
self.logger.warning(err)
raise TypeError(err)
return
def servers(self, asset=None, cache=True, **kwargs):
def servers(self, asset=None, **kwargs):
"""
Performs reads loaded configuration and returns all of the services
that could be parsed and loaded.
"""
if cache is True and isinstance(self._cached_servers, list):
if not self.expired():
# We already have cached results to return; use them
return self._cached_servers
# Our response object
# Our cached response object
self._cached_servers = list()
# read() causes the child class to do whatever it takes for the
@ -107,8 +138,11 @@ class ConfigBase(URLBase):
# None is returned if there was an error or simply no data
content = self.read(**kwargs)
if not isinstance(content, six.string_types):
# Nothing more to do
return list()
# Set the time our content was cached at
self._cached_time = time.time()
# Nothing more to do; return our empty cache list
return self._cached_servers
# Our Configuration format uses a default if one wasn't one detected
# or enfored.
@ -129,6 +163,9 @@ class ConfigBase(URLBase):
self.logger.warning('Failed to load configuration from {}'.format(
self.url()))
# Set the time our content was cached at
self._cached_time = time.time()
return self._cached_servers
def read(self):
@ -138,13 +175,35 @@ class ConfigBase(URLBase):
"""
return None
def expired(self):
"""
Simply returns True if the configuration should be considered
as expired or False if content should be retrieved.
"""
if isinstance(self._cached_servers, list) and self.cache:
# We have enough reason to look further into our cached content
# and verify it has not expired.
if self.cache is True:
# we have not expired, return False
return False
# Verify our cache time to determine whether we will get our
# content again.
age_in_sec = time.time() - self._cached_time
if age_in_sec <= self.cache:
# We have not expired; return False
return False
# If we reach here our configuration should be considered
# missing and/or expired.
return True
@staticmethod
def parse_url(url, verify_host=True):
"""Parses the URL and returns it broken apart into a dictionary.
This is very specific and customized for Apprise.
Args:
url (str): The URL you want to fully parse.
verify_host (:obj:`bool`, optional): a flag kept with the parsed
@ -177,6 +236,17 @@ class ConfigBase(URLBase):
if 'encoding' in results['qsd']:
results['encoding'] = results['qsd'].get('encoding')
# Our cache value
if 'cache' in results['qsd']:
# First try to get it's integer value
try:
results['cache'] = int(results['qsd']['cache'])
except (ValueError, TypeError):
# No problem, it just isn't an integer; now treat it as a bool
# instead:
results['cache'] = parse_bool(results['qsd']['cache'])
return results
@staticmethod
@ -542,15 +612,16 @@ class ConfigBase(URLBase):
return response
def pop(self, index):
def pop(self, index=-1):
"""
Removes an indexed Notification Service from the stack and
returns it.
Removes an indexed Notification Service from the stack and returns it.
By default, the last element of the list is removed.
"""
if not isinstance(self._cached_servers, list):
# Generate ourselves a list of content we can pull from
self.servers(cache=True)
self.servers()
# Pop the element off of the stack
return self._cached_servers.pop(index)
@ -562,7 +633,7 @@ class ConfigBase(URLBase):
"""
if not isinstance(self._cached_servers, list):
# Generate ourselves a list of content we can pull from
self.servers(cache=True)
self.servers()
return self._cached_servers[index]
@ -572,7 +643,7 @@ class ConfigBase(URLBase):
"""
if not isinstance(self._cached_servers, list):
# Generate ourselves a list of content we can pull from
self.servers(cache=True)
self.servers()
return iter(self._cached_servers)
@ -582,6 +653,28 @@ class ConfigBase(URLBase):
"""
if not isinstance(self._cached_servers, list):
# Generate ourselves a list of content we can pull from
self.servers(cache=True)
self.servers()
return len(self._cached_servers)
def __bool__(self):
"""
Allows the Apprise object to be wrapped in an Python 3.x based 'if
statement'. True is returned if our content was downloaded correctly.
"""
if not isinstance(self._cached_servers, list):
# Generate ourselves a list of content we can pull from
self.servers()
return True if self._cached_servers else False
def __nonzero__(self):
"""
Allows the Apprise object to be wrapped in an Python 2.x based 'if
statement'. True is returned if our content was downloaded correctly.
"""
if not isinstance(self._cached_servers, list):
# Generate ourselves a list of content we can pull from
self.servers()
return True if self._cached_servers else False

View File

@ -26,9 +26,9 @@
import re
import io
import os
from os.path import expanduser
from .ConfigBase import ConfigBase
from ..common import ConfigFormat
from ..AppriseLocale import gettext_lazy as _
class ConfigFile(ConfigBase):
@ -36,8 +36,8 @@ class ConfigFile(ConfigBase):
A wrapper for File based configuration sources
"""
# The default descriptive name associated with the Notification
service_name = 'Local File'
# The default descriptive name associated with the service
service_name = _('Local File')
# The default protocol
protocol = 'file'
@ -53,7 +53,7 @@ class ConfigFile(ConfigBase):
super(ConfigFile, self).__init__(**kwargs)
# Store our file path as it was set
self.path = path
self.path = os.path.expanduser(path)
return
@ -62,18 +62,26 @@ class ConfigFile(ConfigBase):
Returns the URL built dynamically based on specified arguments.
"""
# Prepare our cache value
if isinstance(self.cache, bool) or not self.cache:
cache = 'yes' if self.cache else 'no'
else:
cache = int(self.cache)
# Define any arguments set
args = {
'encoding': self.encoding,
'cache': cache,
}
if self.config_format:
# A format was enforced; make sure it's passed back with the url
args['format'] = self.config_format
return 'file://{path}?{args}'.format(
return 'file://{path}{args}'.format(
path=self.quote(self.path),
args=self.urlencode(args),
args='?{}'.format(self.urlencode(args)) if args else '',
)
def read(self, **kwargs):
@ -159,5 +167,5 @@ class ConfigFile(ConfigBase):
if not match:
return None
results['path'] = expanduser(ConfigFile.unquote(match.group('path')))
results['path'] = ConfigFile.unquote(match.group('path'))
return results

View File

@ -29,6 +29,7 @@ import requests
from .ConfigBase import ConfigBase
from ..common import ConfigFormat
from ..URLBase import PrivacyMode
from ..AppriseLocale import gettext_lazy as _
# Support YAML formats
# text/yaml
@ -48,8 +49,8 @@ class ConfigHTTP(ConfigBase):
A wrapper for HTTP based configuration sources
"""
# The default descriptive name associated with the Notification
service_name = 'HTTP'
# The default descriptive name associated with the service
service_name = _('Web Based')
# The default protocol
protocol = 'http'
@ -95,9 +96,18 @@ class ConfigHTTP(ConfigBase):
Returns the URL built dynamically based on specified arguments.
"""
# Prepare our cache value
if isinstance(self.cache, bool) or not self.cache:
cache = 'yes' if self.cache else 'no'
else:
cache = int(self.cache)
# Define any arguments set
args = {
'verify': 'yes' if self.verify_certificate else 'no',
'encoding': self.encoding,
'cache': cache,
}
if self.config_format:
@ -122,12 +132,13 @@ class ConfigHTTP(ConfigBase):
default_port = 443 if self.secure else 80
return '{schema}://{auth}{hostname}{port}/?{args}'.format(
return '{schema}://{auth}{hostname}{port}{fullpath}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.host,
hostname=self.quote(self.host, safe=''),
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
fullpath=self.quote(self.fullpath, safe='/'),
args=self.urlencode(args),
)
@ -169,61 +180,48 @@ class ConfigHTTP(ConfigBase):
try:
# Make our request
r = requests.post(
url,
headers=headers,
auth=auth,
verify=self.verify_certificate,
timeout=self.connection_timeout_sec,
stream=True,
)
with requests.post(
url,
headers=headers,
auth=auth,
verify=self.verify_certificate,
timeout=self.connection_timeout_sec,
stream=True) as r:
if r.status_code != requests.codes.ok:
status_str = \
ConfigBase.http_response_code_lookup(r.status_code)
self.logger.error(
'Failed to get HTTP configuration: '
'{}{} error={}.'.format(
status_str,
',' if status_str else '',
r.status_code))
# Handle Errors
r.raise_for_status()
# Display payload for debug information only; Don't read any
# more than the first X bytes since we're potentially accessing
# content from untrusted servers.
if self.max_error_buffer_size > 0:
self.logger.debug(
'Response Details:\r\n{}'.format(
r.content[0:self.max_error_buffer_size]))
# Get our file-size (if known)
try:
file_size = int(r.headers.get('Content-Length', '0'))
except (TypeError, ValueError):
# Handle edge case where Content-Length is a bad value
file_size = 0
# Close out our connection if it exists to eliminate any
# potential inefficiencies with the Request connection pool as
# documented on their site when using the stream=True option.
r.close()
# Store our response
if self.max_buffer_size > 0 \
and file_size > self.max_buffer_size:
# Return None (signifying a failure)
return None
# Provide warning of data truncation
self.logger.error(
'HTTP config response exceeds maximum buffer length '
'({}KB);'.format(int(self.max_buffer_size / 1024)))
# Store our response
if self.max_buffer_size > 0 and \
r.headers['Content-Length'] > self.max_buffer_size:
# Return None - buffer execeeded
return None
# Provide warning of data truncation
self.logger.error(
'HTTP config response exceeds maximum buffer length '
'({}KB);'.format(int(self.max_buffer_size / 1024)))
# Store our result (but no more than our buffer length)
response = r.content[:self.max_buffer_size + 1]
# Close out our connection if it exists to eliminate any
# potential inefficiencies with the Request connection pool as
# documented on their site when using the stream=True option.
r.close()
# Verify that our content did not exceed the buffer size:
if len(response) > self.max_buffer_size:
# Provide warning of data truncation
self.logger.error(
'HTTP config response exceeds maximum buffer length '
'({}KB);'.format(int(self.max_buffer_size / 1024)))
# Return None - buffer execeeded
return None
else:
# Store our result
response = r.content
# Return None - buffer execeeded
return None
# Detect config format based on mime if the format isn't
# already enforced
@ -249,11 +247,6 @@ class ConfigHTTP(ConfigBase):
# Return None (signifying a failure)
return None
# Close out our connection if it exists to eliminate any potential
# inefficiencies with the Request connection pool as documented on
# their site when using the stream=True option.
r.close()
# Return our response object
return response

View File

@ -43,7 +43,7 @@ def __load_matrix(path=abspath(dirname(__file__)), name='apprise.config'):
skip over modules we simply don't have the dependencies for.
"""
# Used for the detection of additional Notify Services objects
# Used for the detection of additional Configuration Services objects
# The .py extension is optional as we support loading directories too
module_re = re.compile(r'^(?P<name>Config[a-z0-9]+)(\.py)?$', re.I)

View File

@ -26,6 +26,7 @@
import six
import io
import mock
import pytest
from apprise import NotifyFormat
from apprise.Apprise import Apprise
from apprise.AppriseConfig import AppriseConfig
@ -212,9 +213,6 @@ def test_apprise_config(tmpdir):
# above, our results have been cached so we get a 1 response.
assert len(ac.servers()) == 1
# Now do the same check but force a flushed cache
assert len(ac.servers(cache=False)) == 0
def test_apprise_multi_config_entries(tmpdir):
"""
@ -311,7 +309,7 @@ def test_apprise_config_tagging(tmpdir):
assert len(ac.servers(tag='all')) == 3
def test_apprise_instantiate():
def test_apprise_config_instantiate():
"""
API: AppriseConfig.instantiate()
@ -332,15 +330,9 @@ def test_apprise_instantiate():
# Store our bad configuration in our schema map
CONFIG_SCHEMA_MAP['bad'] = BadConfig
try:
with pytest.raises(TypeError):
AppriseConfig.instantiate(
'bad://path', suppress_exceptions=False)
# We should never make it to this line
assert False
except TypeError:
# Exception caught as expected
assert True
# Same call but exceptions suppressed
assert AppriseConfig.instantiate(
@ -378,7 +370,7 @@ def test_apprise_config_with_apprise_obj(tmpdir):
# Store our good notification in our schema map
NOTIFY_SCHEMA_MAP['good'] = GoodNotification
# Create ourselves a config object
# Create ourselves a config object with caching disbled
ac = AppriseConfig(cache=False)
# Nothing loaded yet
@ -587,7 +579,7 @@ def test_apprise_config_matrix_load():
# protocol as string
protocol = 'true'
# Generate ourselfs a fake entry
# Generate ourselves a fake entry
apprise.config.ConfigDummy = ConfigDummy
apprise.config.ConfigDummy2 = ConfigDummy2
apprise.config.ConfigDummy3 = ConfigDummy3

View File

@ -651,10 +651,10 @@ class ConfigGoober(ConfigBase):
# This class tests the fact we have a new class name, but we're
# trying to over-ride items previously used
# The default simple (insecure) protocol (used by ConfigMail)
# The default simple (insecure) protocol
protocol = 'http'
# The default secure protocol (used by ConfigMail)
# The default secure protocol
secure_protocol = 'https'""")
# Utilizes a schema:// already occupied (as tuple)
@ -664,11 +664,10 @@ class ConfigBugger(ConfigBase):
# This class tests the fact we have a new class name, but we're
# trying to over-ride items previously used
# The default simple (insecure) protocol (used by ConfigMail), the other
# isn't
# The default simple (insecure) protocol
protocol = ('http', 'bugger-test' )
# The default secure protocol (used by ConfigMail), the other isn't
# The default secure protocol
secure_protocol = ('https', 'bugger-tests')""")
__load_matrix(path=str(base), name=module_name)

View File

@ -45,7 +45,7 @@ def test_config_file(tmpdir):
t = tmpdir.mkdir("testing").join("apprise")
t.write("gnome://")
assert ConfigFile.parse_url('file://?'.format(str(t))) is None
assert ConfigFile.parse_url('file://?') is None
# Initialize our object
cf = ConfigFile(path=str(t), format='text')
@ -83,9 +83,16 @@ def test_config_file(tmpdir):
# Second reference actually uses cache
iter(cf)
# Cache Handling; cache each request for 30 seconds
results = ConfigFile.parse_url(
'file://{}?cache=30'.format(str(t)))
assert isinstance(results, dict)
cf = ConfigFile(**results)
assert isinstance(cf.url(), six.string_types) is True
assert isinstance(cf.read(), six.string_types) is True
@mock.patch('io.open')
def test_config_file_exceptions(mock_open, tmpdir):
def test_config_file_exceptions(tmpdir):
"""
API: ConfigFile() i/o exception handling
@ -95,10 +102,17 @@ def test_config_file_exceptions(mock_open, tmpdir):
t = tmpdir.mkdir("testing").join("apprise")
t.write("gnome://")
mock_open.side_effect = OSError
# Initialize our object
cf = ConfigFile(path=str(t), format='text')
# Internal Exception would have been thrown and this would fail
with mock.patch('io.open', side_effect=OSError):
assert cf.read() is None
# handle case where the file is to large for what was expected:
max_buffer_size = cf.max_buffer_size
cf.max_buffer_size = 1
assert cf.read() is None
# Restore default value
cf.max_buffer_size = max_buffer_size

View File

@ -24,6 +24,8 @@
# THE SOFTWARE.
import six
import time
import pytest
import mock
import requests
from apprise.common import ConfigFormat
@ -51,9 +53,8 @@ REQUEST_EXCEPTIONS = (
)
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_config_http(mock_post, mock_get):
def test_config_http(mock_post):
"""
API: ConfigHTTP() object
@ -75,20 +76,39 @@ def test_config_http(mock_post, mock_get):
# Store our good notification in our schema map
SCHEMA_MAP['good'] = GoodNotification
# Prepare Mock
dummy_request = mock.Mock()
dummy_request.close.return_value = True
dummy_request.status_code = requests.codes.ok
dummy_request.content = """
taga,tagb=good://server01
"""
dummy_request.headers = {
'Content-Length': len(dummy_request.content),
'Content-Type': 'text/plain',
}
# Our default content
default_content = """taga,tagb=good://server01"""
mock_post.return_value = dummy_request
mock_get.return_value = dummy_request
class DummyResponse(object):
"""
A dummy response used to manage our object
"""
status_code = requests.codes.ok
headers = {
'Content-Length': len(default_content),
'Content-Type': 'text/plain',
}
content = default_content
# Pointer to file
ptr = None
def close(self):
return
def raise_for_status(self):
return
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
return
# Prepare Mock
dummy_response = DummyResponse()
mock_post.return_value = dummy_response
assert ConfigHTTP.parse_url('garbage://') is None
@ -110,6 +130,86 @@ def test_config_http(mock_post, mock_get):
# one entry added
assert len(ch) == 1
# Clear all our mock counters
mock_post.reset_mock()
# Cache Handling; cache each request for 30 seconds
results = ConfigHTTP.parse_url('http://localhost:8080/path/?cache=30')
assert mock_post.call_count == 0
assert isinstance(ch.url(), six.string_types) is True
assert isinstance(results, dict)
ch = ConfigHTTP(**results)
assert mock_post.call_count == 0
assert isinstance(ch.url(), six.string_types) is True
assert mock_post.call_count == 0
assert isinstance(ch.read(), six.string_types) is True
assert mock_post.call_count == 1
# Clear all our mock counters
mock_post.reset_mock()
# Behind the scenes we haven't actually made a fetch yet. We can consider
# our content expired at this point
assert ch.expired() is True
# Test using boolean check; this will force a remote fetch
assert ch
# Now a call was made
assert mock_post.call_count == 1
mock_post.reset_mock()
# Our content hasn't expired yet (it's good for 30 seconds)
assert ch.expired() is False
assert len(ch) == 1
assert mock_post.call_count == 0
# Test using boolean check; we will re-use our cache and not
# make another remote request
mock_post.reset_mock()
assert ch
assert len(ch.servers()) == 1
assert len(ch) == 1
# No remote post has been made
assert mock_post.call_count == 0
with mock.patch('time.time', return_value=time.time() + 10):
# even with 10 seconds elapsed, no fetch will be made
assert ch.expired() is False
assert ch
assert len(ch.servers()) == 1
assert len(ch) == 1
# No remote post has been made
assert mock_post.call_count == 0
with mock.patch('time.time', return_value=time.time() + 31):
# but 30+ seconds from now is considered expired
assert ch.expired() is True
assert ch
assert len(ch.servers()) == 1
assert len(ch) == 1
# Our content would have been renewed with a single new fetch
assert mock_post.call_count == 1
# one entry added
assert len(ch) == 1
# Invalid cache
results = ConfigHTTP.parse_url('http://localhost:8080/path/?cache=False')
assert isinstance(results, dict)
assert isinstance(ch.url(), six.string_types) is True
results = ConfigHTTP.parse_url('http://localhost:8080/path/?cache=-10')
assert isinstance(results, dict)
with pytest.raises(TypeError):
ch = ConfigHTTP(**results)
results = ConfigHTTP.parse_url('http://user@localhost?format=text')
assert isinstance(results, dict)
ch = ConfigHTTP(**results)
@ -157,7 +257,7 @@ def test_config_http(mock_post, mock_get):
iter(ch)
# Test a buffer size limit reach
ch.max_buffer_size = len(dummy_request.content)
ch.max_buffer_size = len(dummy_response.content)
assert isinstance(ch.read(), six.string_types) is True
# Test YAML detection
@ -165,7 +265,7 @@ def test_config_http(mock_post, mock_get):
'text/yaml', 'text/x-yaml', 'application/yaml', 'application/x-yaml')
for st in yaml_supported_types:
dummy_request.headers['Content-Type'] = st
dummy_response.headers['Content-Type'] = st
ch.default_config_format = None
assert isinstance(ch.read(), six.string_types) is True
# Set to YAML
@ -175,7 +275,7 @@ def test_config_http(mock_post, mock_get):
text_supported_types = ('text/plain', 'text/html')
for st in text_supported_types:
dummy_request.headers['Content-Type'] = st
dummy_response.headers['Content-Type'] = st
ch.default_config_format = None
assert isinstance(ch.read(), six.string_types) is True
# Set to TEXT
@ -185,27 +285,52 @@ def test_config_http(mock_post, mock_get):
ukwn_supported_types = ('text/css', 'application/zip')
for st in ukwn_supported_types:
dummy_request.headers['Content-Type'] = st
dummy_response.headers['Content-Type'] = st
ch.default_config_format = None
assert isinstance(ch.read(), six.string_types) is True
# Remains unchanged
assert ch.default_config_format is None
# When the entry is missing; we handle this too
del dummy_request.headers['Content-Type']
del dummy_response.headers['Content-Type']
ch.default_config_format = None
assert isinstance(ch.read(), six.string_types) is True
# Remains unchanged
assert ch.default_config_format is None
# Restore our content type object for lower tests
dummy_request.headers['Content-Type'] = 'text/plain'
dummy_response.headers['Content-Type'] = 'text/plain'
ch.max_buffer_size = len(dummy_request.content) - 1
# Take a snapshot
max_buffer_size = ch.max_buffer_size
ch.max_buffer_size = len(dummy_response.content) - 1
assert ch.read() is None
# Restore buffer size count
ch.max_buffer_size = max_buffer_size
# Test erroneous Content-Length
# Our content is still within the limits, so we're okay
dummy_response.headers['Content-Length'] = 'garbage'
assert isinstance(ch.read(), six.string_types) is True
dummy_response.headers['Content-Length'] = 'None'
# Our content is still within the limits, so we're okay
assert isinstance(ch.read(), six.string_types) is True
# Handle cases where the content length is exactly at our limit
dummy_response.content = 'a' * ch.max_buffer_size
# This is acceptable
assert isinstance(ch.read(), six.string_types) is True
# If we are over our limit though..
dummy_response.content = 'b' * (ch.max_buffer_size + 1)
assert ch.read() is None
# Test an invalid return code
dummy_request.status_code = 400
dummy_response.status_code = 400
assert ch.read() is None
ch.max_error_buffer_size = 0
assert ch.read() is None
@ -213,5 +338,7 @@ def test_config_http(mock_post, mock_get):
# Exception handling
for _exception in REQUEST_EXCEPTIONS:
mock_post.side_effect = _exception
mock_get.side_effect = _exception
assert ch.read() is None
# Restore buffer size count
ch.max_buffer_size = max_buffer_size