config files now correctly parse native urls (#164)

This commit is contained in:
Chris Caron 2019-10-13 14:38:18 -04:00 committed by GitHub
parent ccc9aba55a
commit 82683230a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 103 additions and 118 deletions

View File

@ -34,7 +34,6 @@ from .common import MATCH_ALL_TAG
from .utils import is_exclusive_match
from .utils import parse_list
from .utils import split_urls
from .utils import GET_SCHEMA_RE
from .logger import logger
from .AppriseAsset import AppriseAsset
@ -108,38 +107,8 @@ class Apprise(object):
results = None
if isinstance(url, six.string_types):
# swap hash (#) tag values with their html version
_url = url.replace('/#', '/%23')
# Attempt to acquire the schema at the very least to allow our
# plugins to determine if they can make a better interpretation of
# a URL geared for them
schema = GET_SCHEMA_RE.match(_url)
if schema is None:
logger.error(
'Unparseable schema:// found in URL {}.'.format(url))
return None
# Ensure our schema is always in lower case
schema = schema.group('schema').lower()
# Some basic validation
if schema not in plugins.SCHEMA_MAP:
# Give the user the benefit of the doubt that the user may be
# using one of the URLs provided to them by their notification
# service. Before we fail for good, just scan all the plugins
# that support he native_url() parse function
results = \
next((r['plugin'].parse_native_url(_url)
for r in plugins.MODULE_MAP.values()
if r['plugin'].parse_native_url(_url) is not None),
None)
else:
# Parse our url details of the server object as dictionary
# containing all of the information parsed from our URL
results = plugins.SCHEMA_MAP[schema].parse_url(_url)
# Acquire our url tokens
results = plugins.url_to_dict(url)
if results is None:
# Failed to parse the server URL
logger.error('Unparseable URL {}.'.format(url))

View File

@ -236,35 +236,14 @@ class ConfigBase(URLBase):
# otherwise.
return list()
if not result.group('url'):
# Store our url read in
url = result.group('url')
if not url:
# Comment/empty line; do nothing
continue
# Store our url read in
url = result.group('url')
# swap hash (#) tag values with their html version
_url = url.replace('/#', '/%23')
# Attempt to acquire the schema at the very least to allow our
# plugins to determine if they can make a better
# interpretation of a URL geared for them
schema = GET_SCHEMA_RE.match(_url)
# Ensure our schema is always in lower case
schema = schema.group('schema').lower()
# Some basic validation
if schema not in plugins.SCHEMA_MAP:
ConfigBase.logger.warning(
'Unsupported schema {} on line {}.'.format(
schema, line))
continue
# Parse our url details of the server object as dictionary
# containing all of the information parsed from our URL
results = plugins.SCHEMA_MAP[schema].parse_url(_url)
# Acquire our url tokens
results = plugins.url_to_dict(url)
if results is None:
# Failed to parse the server URL
ConfigBase.logger.warning(
@ -407,63 +386,46 @@ class ConfigBase(URLBase):
results = list()
if isinstance(url, six.string_types):
# We're just a simple URL string
# swap hash (#) tag values with their html version
_url = url.replace('/#', '/%23')
# Attempt to acquire the schema at the very least to allow our
# plugins to determine if they can make a better
# interpretation of a URL geared for them
schema = GET_SCHEMA_RE.match(_url)
# We're just a simple URL string...
schema = GET_SCHEMA_RE.match(url)
if schema is None:
# Log invalid entries so that maintainer of config
# config file at least has something to take action
# with.
ConfigBase.logger.warning(
'Ignored entry {} found under urls, entry #{}'
.format(_url, no + 1))
'Invalid URL {}, entry #{}'.format(url, no + 1))
continue
# Ensure our schema is always in lower case
schema = schema.group('schema').lower()
# Some basic validation
if schema not in plugins.SCHEMA_MAP:
ConfigBase.logger.warning(
'Unsupported schema {} under urls, entry #{}'.format(
schema, no + 1))
continue
# Parse our url details of the server object as dictionary
# containing all of the information parsed from our URL
_results = plugins.SCHEMA_MAP[schema].parse_url(_url)
# We found a valid schema worthy of tracking; store it's
# details:
_results = plugins.url_to_dict(url)
if _results is None:
ConfigBase.logger.warning(
'Unparseable {} based url, entry #{}'.format(
schema, no + 1))
'Unparseable URL {}, entry #{}'.format(
url, no + 1))
continue
# add our results to our global set
results.append(_results)
elif isinstance(url, dict):
# We are a url string with additional unescaped options
# We are a url string with additional unescaped options. In
# this case we want to iterate over all of our options so we
# can at least tell the end user what entries were ignored
# due to errors
if six.PY2:
it = url.iteritems()
else: # six.PY3
it = iter(url.items())
# Track whether a schema was found
schema = None
# Track the URL to-load
_url = None
for _key, tokens in it:
# swap hash (#) tag values with their html version
key = _key.replace('/#', '/%23')
# Get our schema
# Track last acquired schema
schema = None
for key, tokens in it:
# Test our schema
_schema = GET_SCHEMA_RE.match(key)
if _schema is None:
# Log invalid entries so that maintainer of config
@ -471,32 +433,22 @@ class ConfigBase(URLBase):
# with.
ConfigBase.logger.warning(
'Ignored entry {} found under urls, entry #{}'
.format(_key, no + 1))
.format(key, no + 1))
continue
# Store our URL and Schema Regex
_url = key
schema = _schema
if schema is None:
# Store our schema
schema = _schema.group('schema').lower()
if _url is None:
# the loop above failed to match anything
ConfigBase.logger.warning(
'Unsupported schema in urls, entry #{}'.format(no + 1))
continue
# Ensure our schema is always in lower case
schema = schema.group('schema').lower()
# Some basic validation
if schema not in plugins.SCHEMA_MAP:
ConfigBase.logger.warning(
'Unsupported schema {} in urls, entry #{}'.format(
schema, no + 1))
continue
# Parse our url details of the server object as dictionary
# containing all of the information parsed from our URL
_results = plugins.SCHEMA_MAP[schema].parse_url(_url)
_results = plugins.url_to_dict(_url)
if _results is None:
# Setup dictionary
_results = {

View File

@ -43,6 +43,7 @@ from ..common import NOTIFY_IMAGE_SIZES
from ..common import NotifyType
from ..common import NOTIFY_TYPES
from ..utils import parse_list
from ..utils import GET_SCHEMA_RE
from ..AppriseLocale import gettext_lazy as _
from ..AppriseLocale import LazyTranslation
@ -57,6 +58,9 @@ __all__ = [
# NotifyEmail Base Module (used for NotifyEmail testing)
'NotifyEmailBase',
# Tokenizer
'url_to_dict',
# gntp (used for NotifyGrowl Testing)
'gntp',
]
@ -414,3 +418,47 @@ def details(plugin):
'args': template_args,
'kwargs': template_kwargs,
}
def url_to_dict(url):
"""
Takes an apprise URL and returns the tokens associated with it
if they can be acquired based on the plugins available.
None is returned if the URL could not be parsed, otherwise the
tokens are returned.
These tokens can be loaded into apprise through it's add()
function.
"""
# swap hash (#) tag values with their html version
_url = url.replace('/#', '/%23')
# Attempt to acquire the schema at the very least to allow our plugins to
# determine if they can make a better interpretation of a URL geared for
# them.
schema = GET_SCHEMA_RE.match(_url)
if schema is None:
# Not a valid URL; take an early exit
return None
# Ensure our schema is always in lower case
schema = schema.group('schema').lower()
if schema not in SCHEMA_MAP:
# Give the user the benefit of the doubt that the user may be using
# one of the URLs provided to them by their notification service.
# Before we fail for good, just scan all the plugins that support the
# native_url() parse function
results = \
next((r['plugin'].parse_native_url(_url)
for r in MODULE_MAP.values()
if r['plugin'].parse_native_url(_url) is not None),
None)
else:
# Parse our url details of the server object as dictionary
# containing all of the information parsed from our URL
results = SCHEMA_MAP[schema].parse_url(_url)
# Return our results
return results

View File

@ -111,19 +111,26 @@ def test_config_base_config_parse_text():
# A comment line over top of a URL
mailto://userb:pass@gmail.com
# Test a URL using it's native format; in this case Ryver
https://apprise.ryver.com/application/webhook/ckhrjW8w672m6HG
# Invalid URL as it's not associated with a plugin
# or a native url
https://not.a.native.url/
# A line with mulitiple tag assignments to it
taga,tagb=kde://
""", asset=AppriseAsset())
# We expect to parse 2 entries from the above
# We expect to parse 3 entries from the above
assert isinstance(result, list)
assert len(result) == 2
assert len(result) == 3
assert len(result[0].tags) == 0
# Our second element will have tags associated with it
assert len(result[1].tags) == 2
assert 'taga' in result[1].tags
assert 'tagb' in result[1].tags
# Our last element will have 2 tags associated with it
assert len(result[-1].tags) == 2
assert 'taga' in result[-1].tags
assert 'tagb' in result[-1].tags
# Here is a similar result set however this one has an invalid line
# in it which invalidates the entire file
@ -347,11 +354,14 @@ version: 1
urls:
- pbul://o.gn5kj6nfhv736I7jC3cj3QLRiyhgl98b
- mailto://test:password@gmail.com
- https://apprise.ryver.com/application/webhook/ckhrjW8w672m6HG
- https://not.a.native.url/
""", asset=asset)
# We expect to parse 2 entries from the above
# We expect to parse 3 entries from the above
# The Ryver one is in a native form and the 4th one is invalid
assert isinstance(result, list)
assert len(result) == 2
assert len(result) == 3
assert len(result[0].tags) == 0
# Valid Configuration
@ -372,6 +382,12 @@ urls:
# we'll accept it
- mailto://oscar:pass@gmail.com:
# A Ryver URL (using Native format); still accepted
- https://apprise.ryver.com/application/webhook/ckhrjW8w672m6HG:
# An invalid URL with colon (ignored)
- https://not.a.native.url/:
# A telegram entry (returns a None in parse_url())
- tgram://invalid
@ -380,7 +396,7 @@ urls:
# We expect to parse 4 entries from the above because the tgram:// entry
# would have failed to be loaded
assert isinstance(result, list)
assert len(result) == 4
assert len(result) == 5
assert len(result[0].tags) == 2
# Global Tags