Exclusive Apprise URL Tag matching (#154)

This commit is contained in:
Chris Caron 2019-09-29 18:19:55 -04:00 committed by GitHub
parent 1d84f7fd8a
commit 7f60fff521
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 468 additions and 152 deletions

View File

@ -148,6 +148,35 @@ apprise -t 'my title' -b 'my notification body' \
apprise -t 'my title' -b 'my notification body' \
--config=/path/to/my/config.yml \
--config=https://localhost/my/apprise/config
```
If you just want to see the Apprise services loaded out of all of your configuration files but not actually perform the notification itself, you can use the **--dry-run** (**-d**) switch. This will just print the notifications that would have otherwise been notified with the same action and exits. The best part is that the output is carefully formatted to hide private elements such as your password, API Keys, and tokens from prying eyes.
```bash
# Use --dry-run (or -d) to just print out the match
# notifications; but not actually perform the notification
# itself.
apprise --dry-run \
--config=/path/to/my/config.yml \
--config=https://localhost/my/apprise/config
# If your notifications are hidden behind tags, then the above
# command may not output anything. In this case, just add the
# --tag=all switch. 'all' matches against everything!
apprise --dry-run \
--config=/path/to/my/config.yml \
--config=https://localhost/my/apprise/config \
--tag=all
# Use the --dry-run with --tag (assuming you tagged your URLs)
# to find the perfect trigger. If your configuration files have
# all loaded from their default locations, then the command
# can be as simple as this:
apprise --dry-run --tag=friends
# When you're happy, just drop the --dry-run to send off all of
# your notifications:
apprise --tag=friends
```
## Developers

View File

@ -30,6 +30,7 @@ from markdown import markdown
from itertools import chain
from .common import NotifyType
from .common import NotifyFormat
from .common import MATCH_ALL_TAG
from .utils import is_exclusive_match
from .utils import parse_list
from .utils import split_urls
@ -273,30 +274,12 @@ class Apprise(object):
"""
self.servers[:] = []
def notify(self, body, title='', notify_type=NotifyType.INFO,
body_format=None, tag=None):
def find(self, tag=MATCH_ALL_TAG):
"""
Send a notification to all of the plugins previously loaded.
If the body_format specified is NotifyFormat.MARKDOWN, it will
be converted to HTML if the Notification type expects this.
if the tag is specified (either a string or a set/list/tuple
of strings), then only the notifications flagged with that
tagged value are notified. By default all added services
are notified (tag=None)
Returns an list of all servers matching against the tag specified.
"""
# Initialize our return result
status = len(self) > 0
if not (title or body):
return False
# Tracks conversions
conversion_map = dict()
# Build our tag setup
# - top level entries are treated as an 'or'
# - second level (or more) entries are treated as 'and'
@ -319,78 +302,120 @@ class Apprise(object):
for server in servers:
# Apply our tag matching based on our defined logic
if tag is not None and not is_exclusive_match(
logic=tag, data=server.tags):
continue
if is_exclusive_match(
logic=tag, data=server.tags, match_all=MATCH_ALL_TAG):
yield server
return
# If our code reaches here, we either did not define a tag (it
# was set to None), or we did define a tag and the logic above
# determined we need to notify the service it's associated with
if server.notify_format not in conversion_map:
if body_format == NotifyFormat.MARKDOWN and \
server.notify_format == NotifyFormat.HTML:
def notify(self, body, title='', notify_type=NotifyType.INFO,
body_format=None, tag=MATCH_ALL_TAG):
"""
Send a notification to all of the plugins previously loaded.
# Apply Markdown
conversion_map[server.notify_format] = markdown(body)
If the body_format specified is NotifyFormat.MARKDOWN, it will
be converted to HTML if the Notification type expects this.
elif body_format == NotifyFormat.TEXT and \
server.notify_format == NotifyFormat.HTML:
if the tag is specified (either a string or a set/list/tuple
of strings), then only the notifications flagged with that
tagged value are notified. By default all added services
are notified (tag=MATCH_ALL_TAG)
# Basic TEXT to HTML format map; supports keys only
re_map = {
# Support Ampersand
r'&': '&',
This function returns True if all notifications were successfully
sent, False if even just one of them fails, and None if no
notifications were sent at all as a result of tag filtering and/or
simply having empty configuration files that were read.
"""
# Spaces to   for formatting purposes since
# multiple spaces are treated as one an this may
# not be the callers intention
r' ': ' ',
if len(self) == 0:
# Nothing to notify
return False
# Tab support
r'\t': '   ',
# Initialize our return result which only turns to True if we send
# at least one valid notification
status = None
# Greater than and Less than Characters
r'>': '>',
r'<': '&lt;',
}
if not (title or body):
return False
# Compile our map
re_table = re.compile(
r'(' + '|'.join(
map(re.escape, re_map.keys())) + r')',
re.IGNORECASE,
)
# Tracks conversions
conversion_map = dict()
# Execute our map against our body in addition to
# swapping out new lines and replacing them with <br/>
conversion_map[server.notify_format] = \
re.sub(r'\r*\n', '<br/>\r\n',
re_table.sub(
lambda x: re_map[x.group()], body))
# Iterate over our loaded plugins
for server in self.find(tag):
if status is None:
# We have at least one server to notify; change status
# to be a default value of True from now (purely an
# initialiation at this point)
status = True
else:
# Store entry directly
conversion_map[server.notify_format] = body
# If our code reaches here, we either did not define a tag (it
# was set to None), or we did define a tag and the logic above
# determined we need to notify the service it's associated with
if server.notify_format not in conversion_map:
if body_format == NotifyFormat.MARKDOWN and \
server.notify_format == NotifyFormat.HTML:
try:
# Send notification
if not server.notify(
body=conversion_map[server.notify_format],
title=title,
notify_type=notify_type):
# Apply Markdown
conversion_map[server.notify_format] = markdown(body)
# Toggle our return status flag
status = False
elif body_format == NotifyFormat.TEXT and \
server.notify_format == NotifyFormat.HTML:
except TypeError:
# These our our internally thrown notifications
# Basic TEXT to HTML format map; supports keys only
re_map = {
# Support Ampersand
r'&': '&amp;',
# Spaces to &nbsp; for formatting purposes since
# multiple spaces are treated as one an this may
# not be the callers intention
r' ': '&nbsp;',
# Tab support
r'\t': '&nbsp;&nbsp;&nbsp;',
# Greater than and Less than Characters
r'>': '&gt;',
r'<': '&lt;',
}
# Compile our map
re_table = re.compile(
r'(' + '|'.join(
map(re.escape, re_map.keys())) + r')',
re.IGNORECASE,
)
# Execute our map against our body in addition to
# swapping out new lines and replacing them with <br/>
conversion_map[server.notify_format] = \
re.sub(r'\r*\n', '<br/>\r\n',
re_table.sub(
lambda x: re_map[x.group()], body))
else:
# Store entry directly
conversion_map[server.notify_format] = body
try:
# Send notification
if not server.notify(
body=conversion_map[server.notify_format],
title=title,
notify_type=notify_type):
# Toggle our return status flag
status = False
except Exception:
# A catch all so we don't have to abort early
# just because one of our plugins has a bug in it.
logger.exception("Notification Exception")
status = False
except TypeError:
# These our our internally thrown notifications
status = False
except Exception:
# A catch all so we don't have to abort early
# just because one of our plugins has a bug in it.
logger.exception("Notification Exception")
status = False
return status

View File

@ -30,6 +30,7 @@ from . import ConfigBase
from . import URLBase
from .AppriseAsset import AppriseAsset
from .common import MATCH_ALL_TAG
from .utils import GET_SCHEMA_RE
from .utils import parse_list
from .utils import is_exclusive_match
@ -133,7 +134,7 @@ class AppriseConfig(object):
# Return our status
return return_status
def servers(self, tag=None, cache=True):
def servers(self, tag=MATCH_ALL_TAG, cache=True):
"""
Returns all of our servers dynamically build based on parsed
configuration.
@ -160,13 +161,11 @@ class AppriseConfig(object):
for entry in self.configs:
# Apply our tag matching based on our defined logic
if tag is not None and not is_exclusive_match(
logic=tag, data=entry.tags):
continue
# Build ourselves a list of services dynamically and return the
# as a list
response.extend(entry.servers(cache=cache))
if is_exclusive_match(
logic=tag, data=entry.tags, match_all=MATCH_ALL_TAG):
# Build ourselves a list of services dynamically and return the
# as a list
response.extend(entry.servers(cache=cache))
return response

View File

@ -111,13 +111,17 @@ def print_version_msg():
'which services to notify. Use multiple --tag (-g) entries to '
'"OR" the tags together and comma separated to "AND" them. '
'If no tags are specified then all services are notified.')
@click.option('-v', '--verbose', count=True)
@click.option('-V', '--version', is_flag=True,
@click.option('--dry-run', '-d', is_flag=True,
help='Perform a trial run but only prints the notification '
'services to-be triggered to the screen instead of actually '
'performing the action.')
@click.option('--verbose', '-v', count=True)
@click.option('--version', '-V', is_flag=True,
help='Display the apprise version and exit.')
@click.argument('urls', nargs=-1,
metavar='SERVER_URL [SERVER_URL2 [SERVER_URL3]]',)
def main(body, title, config, urls, notification_type, theme, tag, verbose,
version):
def main(body, title, config, urls, notification_type, theme, tag, dry_run,
verbose, version):
"""
Send a notification to all of the specified servers identified by their
URLs the content provided within the title, body and notification-type.
@ -184,16 +188,49 @@ def main(body, title, config, urls, notification_type, theme, tag, verbose,
print_help_msg(main)
sys.exit(1)
if body is None:
# if no body was specified, then read from STDIN
body = click.get_text_stream('stdin').read()
# each --tag entry comprises of a comma separated 'and' list
# we or each of of the --tag and sets specified.
tags = None if not tag else [parse_list(t) for t in tag]
# now print it out
if a.notify(
body=body, title=title, notify_type=notification_type, tag=tags):
sys.exit(0)
sys.exit(1)
if not dry_run:
if body is None:
logger.trace('No --body (-b) specified; reading from stdin')
# if no body was specified, then read from STDIN
body = click.get_text_stream('stdin').read()
# now print it out
result = a.notify(
body=body, title=title, notify_type=notification_type, tag=tags)
else:
# Number of rows to assume in the terminal. In future, maybe this can
# be detected and made dynamic. The actual row count is 80, but 5
# characters are already reserved for the counter on the left
rows = 75
# Initialize our URL response; This is populated within the for/loop
# below; but plays a factor at the end when we need to determine if
# we iterated at least once in the loop.
url = None
for idx, server in enumerate(a.find(tag=tags)):
url = server.url(privacy=True)
click.echo("{: 3d}. {}".format(
idx + 1,
url if len(url) <= rows else '{}...'.format(url[:rows - 3])))
# Initialize a default response of nothing matched, otherwise
# if we matched at least one entry, we can return True
result = None if url is None else True
if result is None:
# There were no notifications set. This is a result of just having
# empty configuration files and/or being to restrictive when filtering
# by specific tag(s)
sys.exit(2)
elif result is False:
# At least 1 notification service failed to send
sys.exit(1)
# else: We're good!
sys.exit(0)

View File

@ -128,3 +128,7 @@ CONFIG_FORMATS = (
ConfigFormat.TEXT,
ConfigFormat.YAML,
)
# This is a reserved tag that is automatically assigned to every
# Notification Plugin
MATCH_ALL_TAG = 'all'

View File

@ -316,6 +316,7 @@ class ConfigBase(URLBase):
Optionally associate an asset with the notification.
"""
response = list()
try:
@ -416,8 +417,12 @@ class ConfigBase(URLBase):
# interpretation of a URL geared for them
schema = GET_SCHEMA_RE.match(_url)
if schema is None:
# Log invalid entries so that maintainer of config
# config file at least has something to take action
# with.
ConfigBase.logger.warning(
'Unsupported schema in urls entry #{}'.format(no + 1))
'Ignored entry {} found under urls, entry #{}'
.format(_url, no + 1))
continue
# Ensure our schema is always in lower case
@ -426,7 +431,7 @@ class ConfigBase(URLBase):
# Some basic validation
if schema not in plugins.SCHEMA_MAP:
ConfigBase.logger.warning(
'Unsupported schema {} in urls entry #{}'.format(
'Unsupported schema {} under urls, entry #{}'.format(
schema, no + 1))
continue
@ -435,7 +440,7 @@ class ConfigBase(URLBase):
_results = plugins.SCHEMA_MAP[schema].parse_url(_url)
if _results is None:
ConfigBase.logger.warning(
'Unparseable {} based url; entry #{}'.format(
'Unparseable {} based url, entry #{}'.format(
schema, no + 1))
continue
@ -445,18 +450,38 @@ class ConfigBase(URLBase):
elif isinstance(url, dict):
# We are a url string with additional unescaped options
if six.PY2:
_url, tokens = next(url.iteritems())
it = url.iteritems()
else: # six.PY3
_url, tokens = next(iter(url.items()))
it = iter(url.items())
# swap hash (#) tag values with their html version
_url = _url.replace('/#', '/%23')
# Track whether a schema was found
schema = None
# Track the URL to-load
_url = None
for _key, tokens in it:
# swap hash (#) tag values with their html version
key = _key.replace('/#', '/%23')
# Get our schema
_schema = GET_SCHEMA_RE.match(key)
if _schema is None:
# Log invalid entries so that maintainer of config
# config file at least has something to take action
# with.
ConfigBase.logger.warning(
'Ignored entry {} found under urls, entry #{}'
.format(_key, no + 1))
continue
# Store our URL and Schema Regex
_url = key
schema = _schema
# Get our schema
schema = GET_SCHEMA_RE.match(_url)
if schema is None:
# the loop above failed to match anything
ConfigBase.logger.warning(
'Unsupported schema in urls entry #{}'.format(no + 1))
'Unsupported schema in urls, entry #{}'.format(no + 1))
continue
# Ensure our schema is always in lower case
@ -465,7 +490,7 @@ class ConfigBase(URLBase):
# Some basic validation
if schema not in plugins.SCHEMA_MAP:
ConfigBase.logger.warning(
'Unsupported schema {} in urls entry #{}'.format(
'Unsupported schema {} in urls, entry #{}'.format(
schema, no + 1))
continue
@ -479,7 +504,7 @@ class ConfigBase(URLBase):
'schema': schema,
}
if tokens is not None:
if isinstance(tokens, (list, tuple, set)):
# populate and/or override any results populated by
# parse_url()
for entries in tokens:

View File

@ -529,7 +529,7 @@ def parse_list(*args):
return sorted([x for x in filter(bool, list(set(result)))])
def is_exclusive_match(logic, data):
def is_exclusive_match(logic, data, match_all='all'):
"""
The data variable should always be a set of strings that the logic can be
@ -547,21 +547,22 @@ def is_exclusive_match(logic, data):
logic=[('tagB', 'tagC')] = tagB and tagC
"""
if logic is None:
# If there is no logic to apply then we're done early
return True
elif isinstance(logic, six.string_types):
if isinstance(logic, six.string_types):
# Update our logic to support our delimiters
logic = set(parse_list(logic))
if not logic:
# If there is no logic to apply then we're done early; we only match
# if there is also no data to match against
return not data
if not isinstance(logic, (list, tuple, set)):
# garbage input
return False
# using the data detected; determine if we'll allow the
# notification to be sent or not
matched = (len(logic) == 0)
# Track what we match against; but by default we do not match
# against anything
matched = False
# Every entry here will be or'ed with the next
for entry in logic:
@ -573,7 +574,7 @@ def is_exclusive_match(logic, data):
# must exist in the notification service
entries = set(parse_list(entry))
if len(entries.intersection(data)) == len(entries):
if len(entries.intersection(data.union({match_all}))) == len(entries):
# our set contains all of the entries found
# in our notification data set
matched = True

View File

@ -461,11 +461,12 @@ def test_apprise_tagging(mock_post, mock_get):
assert(a.notify(
title="my title", body="my body", tag=['awesome', 'mmost']) is True)
# there is nothing to notify using tags 'missing'. However we intentionally
# don't fail as there is value in identifying a tag that simply have
# nothing to notify from while the object itself contains items
# When we query against our loaded notifications for a tag that simply
# isn't assigned to anything, we return None. None (different then False)
# tells us that we litterally had nothing to query. We didn't fail...
# but we also didn't do anything...
assert(a.notify(
title="my title", body="my body", tag='missing') is True)
title="my title", body="my body", tag='missing') is None)
# Now to test the ability to and and/or notifications
a = Apprise()
@ -494,10 +495,10 @@ def test_apprise_tagging(mock_post, mock_get):
title="my title", body="my body", tag=[('TagC', 'TagD')]) is True)
# Expression: (TagY and TagZ) or TagX
# Matches nothing
# Matches nothing, None is returned in this case
assert(a.notify(
title="my title", body="my body",
tag=[('TagY', 'TagZ'), 'TagX']) is True)
tag=[('TagY', 'TagZ'), 'TagX']) is None)
# Expression: (TagY and TagZ) or TagA
# Matches the following only:
@ -515,10 +516,11 @@ def test_apprise_tagging(mock_post, mock_get):
title="my title", body="my body",
tag=[('TagE', 'TagD'), 'TagB']) is True)
# Garbage Entries
# Garbage Entries; we can't do anything with the tag so we have nothing to
# notify as a result. So we simply return None
assert(a.notify(
title="my title", body="my body",
tag=[(object, ), ]) is True)
tag=[(object, ), ]) is None)
def test_apprise_notify_formats(tmpdir):

View File

@ -307,6 +307,8 @@ def test_apprise_config_tagging(tmpdir):
assert len(ac.servers(tag='a,b')) == 3
# Now filter: a and b
assert len(ac.servers(tag=[('a', 'b')])) == 1
# all matches everything
assert len(ac.servers(tag='all')) == 3
def test_apprise_instantiate():
@ -624,9 +626,9 @@ def test_config_base_parse_inaccessible_text_file(mock_getsize, tmpdir):
assert len(ac.servers()) == 0
def test_config_base_parse_yaml_file(tmpdir):
def test_config_base_parse_yaml_file01(tmpdir):
"""
API: ConfigBase.parse_yaml_file
API: ConfigBase.parse_yaml_file (#1)
"""
t = tmpdir.mkdir("empty-file").join("apprise.yml")
@ -640,3 +642,100 @@ def test_config_base_parse_yaml_file(tmpdir):
# no notifications are loaded
assert len(ac.servers()) == 0
def test_config_base_parse_yaml_file02(tmpdir):
"""
API: ConfigBase.parse_yaml_file (#2)
"""
t = tmpdir.mkdir("matching-tags").join("apprise.yml")
t.write("""urls:
- pover://nsisxnvnqixq39t0cw54pxieyvtdd9@2jevtmstfg5a7hfxndiybasttxxfku:
- tag: test1
- pover://rg8ta87qngcrkc6t4qbykxktou0uug@tqs3i88xlufexwl8t4asglt4zp5wfn:
- tag: test2
- pover://jcqgnlyq2oetea4qg3iunahj8d5ijm@evalvutkhc8ipmz2lcgc70wtsm0qpb:
- tag: test3""")
# Create ourselves a config object
ac = AppriseConfig(paths=str(t))
# The number of configuration files that exist
assert len(ac) == 1
# no notifications are loaded
assert len(ac.servers()) == 3
# Test our ability to add Config objects to our apprise object
a = Apprise()
# Add our configuration object
assert a.add(servers=ac) is True
# Detect our 3 entry as they should have loaded successfully
assert len(a) == 3
# No match
assert sum(1 for _ in a.find('no-match')) == 0
# Match everything
assert sum(1 for _ in a.find('all')) == 3
# Match test1 entry
assert sum(1 for _ in a.find('test1')) == 1
# Match test2 entry
assert sum(1 for _ in a.find('test2')) == 1
# Match test3 entry
assert sum(1 for _ in a.find('test3')) == 1
# Match test1 or test3 entry
assert sum(1 for _ in a.find('test1, test3')) == 2
def test_config_base_parse_yaml_file03(tmpdir):
"""
API: ConfigBase.parse_yaml_file (#3)
"""
t = tmpdir.mkdir("bad-first-entry").join("apprise.yml")
# The first entry is -tag and not <dash><space>tag
# The element is therefore not picked up; This causes us to display
# some warning messages to the screen complaining of this typo yet
# still allowing us to load the URL since it is valid
t.write("""urls:
- pover://nsisxnvnqixq39t0cw54pxieyvtdd9@2jevtmstfg5a7hfxndiybasttxxfku:
-tag: test1
- pover://rg8ta87qngcrkc6t4qbykxktou0uug@tqs3i88xlufexwl8t4asglt4zp5wfn:
- tag: test2
- pover://jcqgnlyq2oetea4qg3iunahj8d5ijm@evalvutkhc8ipmz2lcgc70wtsm0qpb:
- tag: test3""")
# Create ourselves a config object
ac = AppriseConfig(paths=str(t))
# The number of configuration files that exist
assert len(ac) == 1
# no notifications lines processed is 3
assert len(ac.servers()) == 3
# Test our ability to add Config objects to our apprise object
a = Apprise()
# Add our configuration object
assert a.add(servers=ac) is True
# Detect our 3 entry as they should have loaded successfully
assert len(a) == 3
# No match
assert sum(1 for _ in a.find('no-match')) == 0
# Match everything
assert sum(1 for _ in a.find('all')) == 3
# No match for bad entry
assert sum(1 for _ in a.find('test1')) == 0
# Match test2 entry
assert sum(1 for _ in a.find('test2')) == 1
# Match test3 entry
assert sum(1 for _ in a.find('test3')) == 1
# Match test1 or test3 entry; (only matches test3)
assert sum(1 for _ in a.find('test1, test3')) == 1

View File

@ -23,6 +23,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import print_function
import re
import mock
from apprise import cli
from apprise import NotifyBase
@ -59,21 +60,21 @@ def test_apprise_cli(tmpdir):
# Pretend everything is okay
return True
def url(self):
def url(self, *args, **kwargs):
# Support url()
return ''
return 'good://'
class BadNotification(NotifyBase):
def __init__(self, *args, **kwargs):
super(BadNotification, self).__init__(*args, **kwargs)
def notify(self, **kwargs):
# Pretend everything is okay
# Force a notification failure
return False
def url(self):
def url(self, *args, **kwargs):
# Support url()
return ''
return 'bad://'
# Set up our notification types
SCHEMA_MAP['good'] = GoodNotification
@ -120,6 +121,16 @@ def test_apprise_cli(tmpdir):
])
assert result.exit_code == 1
# Testing with the --dry-run flag reveals a successful response since we
# don't actually execute the bad:// notification; we only display it
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'bad://localhost',
'--dry-run',
])
assert result.exit_code == 0
# Write a simple text based configuration file
t = tmpdir.mkdir("apprise-obj").join("apprise")
buf = """
@ -128,13 +139,14 @@ def test_apprise_cli(tmpdir):
"""
t.write(buf)
# This will read our configuration and send 2 notices to
# each of the above defined good:// entries
# This will read our configuration and not send any notices at all
# because we assigned tags to all of our urls and didn't identify
# a specific match below.
result = runner.invoke(cli.main, [
'-b', 'test config',
'--config', str(t),
])
assert result.exit_code == 0
assert result.exit_code == 2
# This will send out 1 notification because our tag matches
# one of the entries above
@ -146,17 +158,6 @@ def test_apprise_cli(tmpdir):
])
assert result.exit_code == 0
# This will send out 0 notification because our tag requests that we meet
# at least 2 tags associated with the same notification service (which
# isn't the case above)
# translation: has taga AND tagd
result = runner.invoke(cli.main, [
'-b', 'has taga AND tagd',
'--config', str(t),
'--tag', 'taga,tagd',
])
assert result.exit_code == 0
# This will send out 2 notifications because by specifying 2 tag
# entries, we 'or' them together:
# translation: has taga or tagb or tagd
@ -169,6 +170,81 @@ def test_apprise_cli(tmpdir):
])
assert result.exit_code == 0
# Write a simple text based configuration file
t = tmpdir.mkdir("apprise-obj2").join("apprise-test2")
buf = """
good://localhost/1
good://localhost/2
good://localhost/3
good://localhost/4
good://localhost/5
myTag=good://localhost/6
"""
t.write(buf)
# This will read our configuration and send a notification to
# the first 5 entries in the list, but not the one that has
# the tag associated with it
result = runner.invoke(cli.main, [
'-b', 'test config',
'--config', str(t),
])
assert result.exit_code == 0
# As a way of ensuring we match the first 5 entries, we can run a
# --dry-run against the same result set above and verify the output
result = runner.invoke(cli.main, [
'-b', 'test config',
'--config', str(t),
'--dry-run',
])
assert result.exit_code == 0
lines = re.split(r'[\r\n]', result.output.strip())
# 5 lines of all good:// entries matched
assert len(lines) == 5
# Verify we match against the remaining good:// entries
for i in range(0, 5):
assert lines[i].endswith('good://')
# This will fail because nothing matches mytag. It's case sensitive
# and we would only actually match against myTag
result = runner.invoke(cli.main, [
'-b', 'has taga',
'--config', str(t),
'--tag', 'mytag',
])
assert result.exit_code == 2
# Same command as the one identified above except we set the --dry-run
# flag. This causes our list of matched results to be printed only.
# However, since we don't match anything; we still fail with a return code
# of 2.
result = runner.invoke(cli.main, [
'-b', 'has taga',
'--config', str(t),
'--tag', 'mytag',
'--dry-run'
])
assert result.exit_code == 2
# Here is a case where we get what was expected
result = runner.invoke(cli.main, [
'-b', 'has taga',
'--config', str(t),
'--tag', 'myTag',
])
assert result.exit_code == 0
# Testing with the --dry-run flag reveals the same positive results
# because there was at least one match
result = runner.invoke(cli.main, [
'-b', 'has taga',
'--config', str(t),
'--tag', 'myTag',
'--dry-run',
])
assert result.exit_code == 0
@mock.patch('platform.system')
def test_apprise_cli_windows_env(mock_system):

View File

@ -644,14 +644,17 @@ def test_exclusive_match():
"""utils: is_exclusive_match() testing
"""
# No Logic always returns True
# No Logic always returns True if there is also no data
assert utils.is_exclusive_match(data=None, logic=None) is True
assert utils.is_exclusive_match(data=None, logic=set()) is True
assert utils.is_exclusive_match(data='', logic=set()) is True
assert utils.is_exclusive_match(data=u'', logic=set()) is True
assert utils.is_exclusive_match(data=u'check', logic=set()) is True
# however, once data is introduced, True is no longer returned
# if no logic has been specified
assert utils.is_exclusive_match(data=u'check', logic=set()) is False
assert utils.is_exclusive_match(
data=['check', 'checkb'], logic=set()) is True
data=['check', 'checkb'], logic=set()) is False
# String delimters are stripped out so that a list can be formed
# the below is just an empty token list
@ -696,6 +699,10 @@ def test_exclusive_match():
#
data = set(['abc', 'def', 'efg', 'xyz'])
# match_all matches everything
assert utils.is_exclusive_match(logic='all', data=data) is True
assert utils.is_exclusive_match(logic=['all'], data=data) is True
# def and abc in data
assert utils.is_exclusive_match(
logic=[('abc', 'def')], data=data) is True
@ -715,6 +722,18 @@ def test_exclusive_match():
assert utils.is_exclusive_match(
logic=['www', 'zzz', ('abc', 'jjj')], data=data) is False
#
# Empty data set
#
data = set()
assert utils.is_exclusive_match(logic=['www'], data=data) is False
assert utils.is_exclusive_match(logic='all', data=data) is True
# Change default value from 'all' to 'match_me'. Logic matches
# so we pass
assert utils.is_exclusive_match(
logic='match_me', data=data, match_all='match_me') is True
def test_environ_temporary_change():
"""utils: environ() testing