diff --git a/apprise/config/ConfigBase.py b/apprise/config/ConfigBase.py index 50a7baeb..e828cc18 100644 --- a/apprise/config/ConfigBase.py +++ b/apprise/config/ConfigBase.py @@ -805,16 +805,16 @@ class ConfigBase(URLBase): .format(key, no + 1)) continue - # Store our URL and Schema Regex - _url = key - # Store our schema schema = _schema.group('schema').lower() + # Store our URL and Schema Regex + _url = key + if _url is None: # the loop above failed to match anything ConfigBase.logger.warning( - 'Unsupported schema in urls, entry #{}'.format(no + 1)) + 'Unsupported URL, entry #{}'.format(no + 1)) continue _results = plugins.url_to_dict(_url) @@ -844,6 +844,11 @@ class ConfigBase(URLBase): if 'schema' in entries: del entries['schema'] + # support our special tokens (if they're present) + if schema in plugins.SCHEMA_MAP: + entries = ConfigBase.__extract_special_tokens( + schema, entries) + # Extend our dictionary with our new entries r.update(entries) @@ -851,6 +856,11 @@ class ConfigBase(URLBase): results.append(r) elif isinstance(tokens, dict): + # support our special tokens (if they're present) + if schema in plugins.SCHEMA_MAP: + tokens = ConfigBase.__extract_special_tokens( + schema, tokens) + # Copy ourselves a template of our parsed URL as a base to # work with r = _results.copy() @@ -949,6 +959,53 @@ class ConfigBase(URLBase): # Pop the element off of the stack return self._cached_servers.pop(index) + @staticmethod + def __extract_special_tokens(schema, tokens): + """ + This function takes a list of tokens and updates them to no longer + include any special tokens such as +,-, and : + + - schema must be a valid schema of a supported plugin type + - tokens must be a dictionary containing the yaml entries parsed. + + The idea here is we can post process a set of tokens provided in + a YAML file where the user provided some of the special keywords. + + We effectivley look up what these keywords map to their appropriate + value they're expected + """ + # Create a copy of our dictionary + tokens = tokens.copy() + + for kw, meta in plugins.SCHEMA_MAP[schema]\ + .template_kwargs.items(): + + # Determine our prefix: + prefix = meta.get('prefix', '+') + + # Detect any matches + matches = \ + {k[1:]: str(v) for k, v in tokens.items() + if k.startswith(prefix)} + + if not matches: + # we're done with this entry + continue + + if not isinstance(tokens.get(kw, None), dict): + # Invalid; correct it + tokens[kw] = dict() + + # strip out processed tokens + tokens = {k: v for k, v in tokens.items() + if not k.startswith(prefix)} + + # Update our entries + tokens[kw].update(matches) + + # Return our tokens + return tokens + def __getitem__(self, index): """ Returns the indexed server entry associated with the loaded diff --git a/apprise/plugins/NotifyMSTeams.py b/apprise/plugins/NotifyMSTeams.py index b12c5e45..7c573a59 100644 --- a/apprise/plugins/NotifyMSTeams.py +++ b/apprise/plugins/NotifyMSTeams.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright (C) 2019 Chris Caron +# Copyright (C) 2020 Chris Caron # All rights reserved. # # This code is licensed under the MIT License. @@ -62,7 +62,7 @@ # import re import requests -from json import dumps +import json from .NotifyBase import NotifyBase from ..common import NotifyImageSize @@ -70,8 +70,18 @@ from ..common import NotifyType from ..common import NotifyFormat from ..utils import parse_bool from ..utils import validate_regex +from ..utils import apply_template +from ..utils import TemplateType +from ..AppriseAttachment import AppriseAttachment from ..AppriseLocale import gettext_lazy as _ +try: + from json.decoder import JSONDecodeError + +except ImportError: + # Python v2.7 Backwards Compatibility support + JSONDecodeError = ValueError + # Used to prepare our UUID regex matching UUID4_RE = \ r'[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}' @@ -106,6 +116,10 @@ class NotifyMSTeams(NotifyBase): # Default Notification Format notify_format = NotifyFormat.MARKDOWN + # There is no reason we should exceed 35KB when reading in a JSON file. + # If it is more than this, then it is not accepted + max_msteams_template_size = 35000 + # Define object templates templates = ( '{schema}://{token_a}/{token_b}{token_c}', @@ -150,12 +164,30 @@ class NotifyMSTeams(NotifyBase): 'default': False, 'map_to': 'include_image', }, + 'template': { + 'name': _('Template Path'), + 'type': 'string', + 'private': True, + }, }) + # Define our token control + template_kwargs = { + 'tokens': { + 'name': _('Template Tokens'), + 'prefix': ':', + }, + } + def __init__(self, token_a, token_b, token_c, include_image=True, - **kwargs): + template=None, tokens=None, **kwargs): """ Initialize Microsoft Teams Object + + You can optional specify a template and identify arguments you + wish to populate your template with when posting. Some reserved + template arguments that can not be over-ridden are: + `body`, `title`, and `type`. """ super(NotifyMSTeams, self).__init__(**kwargs) @@ -186,8 +218,120 @@ class NotifyMSTeams(NotifyBase): # Place a thumbnail image inline with the message body self.include_image = include_image + # Our template object is just an AppriseAttachment object + self.template = AppriseAttachment(asset=self.asset) + if template: + # Add our definition to our template + self.template.add(template) + # Enforce maximum file size + self.template[0].max_file_size = self.max_msteams_template_size + + # Template functionality + self.tokens = {} + if isinstance(tokens, dict): + self.tokens.update(tokens) + + elif tokens: + msg = 'The specified MSTeams Template Tokens ' \ + '({}) are not identified as a dictionary.'.format(tokens) + self.logger.warning(msg) + raise TypeError(msg) + + # else: NoneType - this is okay return + def gen_payload(self, body, title='', notify_type=NotifyType.INFO, + **kwargs): + """ + This function generates our payload whether it be the generic one + Apprise generates by default, or one provided by a specified + external template. + """ + + # Acquire our to-be footer icon if configured to do so + image_url = None if not self.include_image \ + else self.image_url(notify_type) + + if not self.template: + # By default we use a generic working payload if there was + # no template specified + payload = { + "@type": "MessageCard", + "@context": "https://schema.org/extensions", + "summary": self.app_desc, + "themeColor": self.color(notify_type), + "sections": [ + { + "activityImage": None, + "activityTitle": title, + "text": body, + }, + ] + } + + if image_url: + payload['sections'][0]['activityImage'] = image_url + + return payload + + # If our code reaches here, then we generate ourselves the payload + template = self.template[0] + if not template: + # We could not access the attachment + self.logger.error( + 'Could not access MSTeam template {}.'.format( + template.url(privacy=True))) + return False + + # Take a copy of our token dictionary + tokens = self.tokens.copy() + + # Apply some defaults template values + tokens['app_body'] = body + tokens['app_title'] = title + tokens['app_type'] = notify_type + tokens['app_id'] = self.app_id + tokens['app_desc'] = self.app_desc + tokens['app_color'] = self.color(notify_type) + tokens['app_image_url'] = image_url + tokens['app_url'] = self.app_url + + # Enforce Application mode + tokens['app_mode'] = TemplateType.JSON + + try: + with open(template.path, 'r') as fp: + content = json.loads(apply_template(fp.read(), **tokens)) + + except (OSError, IOError): + self.logger.error( + 'MSTeam template {} could not be read.'.format( + template.url(privacy=True))) + return None + + except JSONDecodeError as e: + self.logger.error( + 'MSTeam template {} contains invalid JSON.'.format( + template.url(privacy=True))) + self.logger.debug('JSONDecodeError: {}'.format(e)) + return None + + # Load our JSON data (if valid) + has_error = False + if '@type' not in content: + self.logger.error( + 'MSTeam template {} is missing @type kwarg.'.format( + template.url(privacy=True))) + has_error = True + + if '@context' not in content: + self.logger.error( + 'MSTeam template {} is missing @context kwarg.'.format( + template.url(privacy=True))) + has_error = True + + return content if not has_error else None + def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs): """ Perform Microsoft Teams Notification @@ -205,27 +349,13 @@ class NotifyMSTeams(NotifyBase): self.token_c, ) - # Prepare our payload - payload = { - "@type": "MessageCard", - "@context": "https://schema.org/extensions", - "summary": self.app_desc, - "themeColor": self.color(notify_type), - "sections": [ - { - "activityImage": None, - "activityTitle": title, - "text": body, - }, - ] - } - - # Acquire our to-be footer icon if configured to do so - image_url = None if not self.include_image \ - else self.image_url(notify_type) - - if image_url: - payload['sections'][0]['activityImage'] = image_url + # Generate our payload if it's possible + payload = self.gen_payload( + body=body, title=title, notify_type=notify_type, **kwargs) + if not payload: + # No need to present a reason; that will come from the + # gen_payload() function itself + return False self.logger.debug('MSTeams POST URL: %s (cert_verify=%r)' % ( url, self.verify_certificate, @@ -237,7 +367,7 @@ class NotifyMSTeams(NotifyBase): try: r = requests.post( url, - data=dumps(payload), + data=json.dumps(payload), headers=headers, verify=self.verify_certificate, timeout=self.request_timeout, @@ -283,8 +413,14 @@ class NotifyMSTeams(NotifyBase): 'image': 'yes' if self.include_image else 'no', } + if self.template: + params['template'] = NotifyMSTeams.quote( + self.template[0].url(), safe='') + # Extend our parameters params.update(self.url_parameters(privacy=privacy, *args, **kwargs)) + # Store any template entries if specified + params.update({':{}'.format(k): v for k, v in self.tokens.items()}) return '{schema}://{token_a}/{token_b}/{token_c}/'\ '?{params}'.format( @@ -341,6 +477,13 @@ class NotifyMSTeams(NotifyBase): results['include_image'] = \ parse_bool(results['qsd'].get('image', True)) + if 'template' in results['qsd'] and results['qsd']['template']: + results['template'] = \ + NotifyMSTeams.unquote(results['qsd']['template']) + + # Store our tokens + results['tokens'] = results['qsd:'] + return results @staticmethod diff --git a/apprise/utils.py b/apprise/utils.py index 8d092007..21f2c493 100644 --- a/apprise/utils.py +++ b/apprise/utils.py @@ -25,6 +25,7 @@ import re import six +import json import contextlib import os from os.path import expanduser @@ -95,9 +96,10 @@ TIDY_NUX_TRIM_RE = re.compile( # The handling of custom arguments passed in the URL; we treat any # argument (which would otherwise appear in the qsd area of our parse_url() -# function differently if they start with a + or - value +# function differently if they start with a +, - or : value NOTIFY_CUSTOM_ADD_TOKENS = re.compile(r'^( |\+)(?P.*)\s*') NOTIFY_CUSTOM_DEL_TOKENS = re.compile(r'^-(?P.*)\s*') +NOTIFY_CUSTOM_COLON_TOKENS = re.compile(r'^:(?P.*)\s*') # Used for attempting to acquire the schema if the URL can't be parsed. GET_SCHEMA_RE = re.compile(r'\s*(?P[a-z0-9]{2,9})://.*$', re.I) @@ -141,6 +143,19 @@ EMAIL_DETECTION_RE = re.compile( REGEX_VALIDATE_LOOKUP = {} +class TemplateType(object): + """ + Defines the different template types we can perform parsing on + """ + # RAW does nothing at all to the content being parsed + # data is taken at it's absolute value + RAW = 'raw' + + # Data is presumed to be of type JSON and is therefore escaped + # if required to do so (such as single quotes) + JSON = 'json' + + def is_ipaddr(addr, ipv4=True, ipv6=True): """ Validates against IPV4 and IPV6 IP Addresses @@ -318,10 +333,11 @@ def parse_qsd(qs): 'qsd': {}, # Detected Entries that start with + or - are additionally stored in - # these values (un-touched). The +/- however are stripped from their + # these values (un-touched). The :,+,- however are stripped from their # name before they are stored here. 'qsd+': {}, 'qsd-': {}, + 'qsd:': {}, } pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')] @@ -361,6 +377,12 @@ def parse_qsd(qs): # Store content 'as-is' result['qsd-'][k.group('key')] = val + # Check for tokens that start with a colon symbol (:) + k = NOTIFY_CUSTOM_COLON_TOKENS.match(key) + if k is not None: + # Store content 'as-is' + result['qsd:'][k.group('key')] = val + return result @@ -418,11 +440,12 @@ def parse_url(url, default_schema='http', verify_host=True): # qsd = Query String Dictionary 'qsd': {}, - # Detected Entries that start with + or - are additionally stored in - # these values (un-touched). The +/- however are stripped from their - # name before they are stored here. + # Detected Entries that start with +, - or : are additionally stored in + # these values (un-touched). The +, -, and : however are stripped + # from their name before they are stored here. 'qsd+': {}, 'qsd-': {}, + 'qsd:': {}, } qsdata = '' @@ -845,3 +868,45 @@ def environ(*remove, **update): finally: # Restore our snapshot os.environ = env_orig.copy() + + +def apply_template(template, app_mode=TemplateType.RAW, **kwargs): + """ + Takes a template in a str format and applies all of the keywords + and their values to it. + + The app$mode is used to dictact any pre-processing that needs to take place + to the escaped string prior to it being placed. The idea here is for + elements to be placed in a JSON response for example should be escaped + early in their string format. + + The template must contain keywords wrapped in in double + squirly braces like {{keyword}}. These are matched to the respected + kwargs passed into this function. + + If there is no match found, content is not swapped. + + """ + + def _escape_raw(content): + # No escaping necessary + return content + + def _escape_json(content): + # remove surounding quotes + return json.dumps(content)[1:-1] + + # Our escape function + fn = _escape_json if app_mode == TemplateType.JSON else _escape_raw + + lookup = [re.escape(x) for x in kwargs.keys()] + + # Compile this into a list + mask_r = re.compile( + re.escape('{{') + r'\s*(' + '|'.join(lookup) + r')\s*' + + re.escape('}}'), re.IGNORECASE) + + # we index 2 characters off the head and 2 characters from the tail + # to drop the '{{' and '}}' surrounding our match so that we can + # re-index it back into our list + return mask_r.sub(lambda x: fn(kwargs[x.group()[2:-2].strip()]), template) diff --git a/test/test_api.py b/test/test_api.py index affe78be..c0bddae7 100644 --- a/test/test_api.py +++ b/test/test_api.py @@ -776,18 +776,17 @@ def test_apprise_asset(tmpdir): ) a.default_html_color = '#abcabc' - a.html_notify_map[NotifyType.INFO] = '#aaaaaa' assert a.color('invalid', tuple) == (171, 202, 188) - assert a.color(NotifyType.INFO, tuple) == (170, 170, 170) + assert a.color(NotifyType.INFO, tuple) == (58, 163, 227) assert a.color('invalid', int) == 11258556 - assert a.color(NotifyType.INFO, int) == 11184810 + assert a.color(NotifyType.INFO, int) == 3843043 assert a.color('invalid', None) == '#abcabc' - assert a.color(NotifyType.INFO, None) == '#aaaaaa' + assert a.color(NotifyType.INFO, None) == '#3AA3E3' # None is the default - assert a.color(NotifyType.INFO) == '#aaaaaa' + assert a.color(NotifyType.INFO) == '#3AA3E3' # Invalid Type with pytest.raises(ValueError): @@ -1203,7 +1202,7 @@ def test_apprise_details_plugin_verification(): assert isinstance(arg['prefix'], six.string_types) if section == 'kwargs': # The only acceptable prefix types for kwargs - assert arg['prefix'] in ('+', '-') + assert arg['prefix'] in (':', '+', '-') else: # kwargs requires that the 'prefix' is defined diff --git a/test/test_config_base.py b/test/test_config_base.py index b35b0ad7..5e057acb 100644 --- a/test/test_config_base.py +++ b/test/test_config_base.py @@ -605,7 +605,7 @@ urls: """, asset=asset) - # We expect to parse 4 entries from the above because the tgram:// entry + # We expect to parse 6 entries from the above because the tgram:// entry # would have failed to be loaded assert isinstance(result, list) assert len(result) == 6 @@ -884,6 +884,20 @@ include: assert 'http://localhost/apprise/cfg02' in config assert 'http://localhost/apprise/cfg03' in config + # Test a configuration with an invalid schema with options + result, config = ConfigBase.config_parse_yaml(""" + urls: + - invalid://: + tag: 'invalid' + :name: 'Testing2' + :body: 'test body2' + :title: 'test title2' +""", asset=asset) + + # We will have loaded no results + assert isinstance(result, list) + assert len(result) == 0 + # Valid Configuration (we allow comma separated entries for # each defined bullet) result, config = ConfigBase.config_parse_yaml(""" diff --git a/test/test_msteam_plugin.py b/test/test_msteam_plugin.py new file mode 100644 index 00000000..f76c5ff6 --- /dev/null +++ b/test/test_msteam_plugin.py @@ -0,0 +1,548 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2020 Chris Caron +# All rights reserved. +# +# This code is licensed under the MIT License. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files(the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions : +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import mock +import json +import requests +import pytest +from apprise import Apprise +from apprise import AppriseConfig +from apprise import plugins +from apprise import NotifyType + +# Disable logging for a cleaner testing output +import logging +logging.disable(logging.CRITICAL) + + +@mock.patch('requests.post') +def test_msteams_templating(mock_post, tmpdir): + """ + API: NotifyMSTeams() Templating + + """ + # Disable Throttling to speed testing + plugins.NotifyBase.request_rate_per_sec = 0 + + # Prepare Mock + mock_post.return_value = requests.Request() + mock_post.return_value.status_code = requests.codes.ok + + uuid4 = '8b799edf-6f98-4d3a-9be7-2862fb4e5752' + url = 'msteams://{}@{}/{}/{}'.format(uuid4, uuid4, 'a' * 32, uuid4) + + # Test cases where our URL is invalid + template = tmpdir.join("simple.json") + template.write(""" + { + "@type": "MessageCard", + "@context": "https://schema.org/extensions", + "summary": "{{app_id}}", + "themeColor": "{{app_color}}", + "sections": [ + { + "activityImage": null, + "activityTitle": "{{app_title}}", + "text": "{{app_body}}" + } + ] + } + """) + + # Instantiate our URL + obj = Apprise.instantiate('{url}/?template={template}&{kwargs}'.format( + url=url, + template=str(template), + kwargs=':key1=token&:key2=token', + )) + + assert isinstance(obj, plugins.NotifyMSTeams) + assert obj.notify( + body="body", title='title', + notify_type=NotifyType.INFO) is True + + assert mock_post.called is True + assert mock_post.call_args_list[0][0][0].startswith( + 'https://outlook.office.com/webhook/') + + # Our Posted JSON Object + posted_json = json.loads(mock_post.call_args_list[0][1]['data']) + assert 'summary' in posted_json + assert posted_json['summary'] == 'Apprise' + assert posted_json['themeColor'] == '#3AA3E3' + assert posted_json['sections'][0]['activityTitle'] == 'title' + assert posted_json['sections'][0]['text'] == 'body' + + # Test invalid JSON + + # Test cases where our URL is invalid + template = tmpdir.join("invalid.json") + template.write("}") + + # Instantiate our URL + obj = Apprise.instantiate('{url}/?template={template}&{kwargs}'.format( + url=url, + template=str(template), + kwargs=':key1=token&:key2=token', + )) + + assert isinstance(obj, plugins.NotifyMSTeams) + # We will fail to preform our notifcation because the JSON is bad + assert obj.notify( + body="body", title='title', + notify_type=NotifyType.INFO) is False + + # Test cases where we're missing the @type part of the URL + template = tmpdir.join("missing_type.json") + template.write(""" + { + "@context": "https://schema.org/extensions", + "summary": "{{app_id}}", + "themeColor": "{{app_color}}", + "sections": [ + { + "activityImage": null, + "activityTitle": "{{app_title}}", + "text": "{{app_body}}" + } + ] + } + """) + + # Instantiate our URL + obj = Apprise.instantiate('{url}/?template={template}&{kwargs}'.format( + url=url, + template=str(template), + kwargs=':key1=token&:key2=token', + )) + + assert isinstance(obj, plugins.NotifyMSTeams) + + # We can not load the file because we're missing the @type entry + assert obj.notify( + body="body", title='title', + notify_type=NotifyType.INFO) is False + + # Test cases where we're missing the @context part of the URL + template = tmpdir.join("missing_context.json") + template.write(""" + { + "@type": "MessageCard", + "summary": "{{app_id}}", + "themeColor": "{{app_color}}", + "sections": [ + { + "activityImage": null, + "activityTitle": "{{app_title}}", + "text": "{{app_body}}" + } + ] + } + """) + + # Instantiate our URL + obj = Apprise.instantiate('{url}/?template={template}&{kwargs}'.format( + url=url, + template=str(template), + kwargs=':key1=token&:key2=token', + )) + + assert isinstance(obj, plugins.NotifyMSTeams) + # We can not load the file because we're missing the @context entry + assert obj.notify( + body="body", title='title', + notify_type=NotifyType.INFO) is False + + # Test a case where we can not access the file: + with mock.patch('json.loads', side_effect=OSError): + # we fail, but this time it's because we couldn't + # access the cached file contents for reading + assert obj.notify( + body="body", title='title', + notify_type=NotifyType.INFO) is False + + # A more complicated example; uses a target + mock_post.reset_mock() + template = tmpdir.join("more_complicated_example.json") + template.write(""" + { + "@type": "MessageCard", + "@context": "https://schema.org/extensions", + "summary": "{{app_desc}}", + "themeColor": "{{app_color}}", + "sections": [ + { + "activityImage": null, + "activityTitle": "{{app_title}}", + "text": "{{app_body}}" + } + ], + "potentialAction": [{ + "@type": "ActionCard", + "name": "Add a comment", + "inputs": [{ + "@type": "TextInput", + "id": "comment", + "isMultiline": false, + "title": "Add a comment here for this task." + }], + "actions": [{ + "@type": "HttpPOST", + "name": "Add Comment", + "target": "{{ target }}" + }] + }] + } + """) + + # Instantiate our URL + obj = Apprise.instantiate('{url}/?template={template}&{kwargs}'.format( + url=url, + template=str(template), + kwargs=':key1=token&:key2=token&:target=http://localhost', + )) + + assert isinstance(obj, plugins.NotifyMSTeams) + assert obj.notify( + body="body", title='title', + notify_type=NotifyType.INFO) is True + + assert mock_post.called is True + assert mock_post.call_args_list[0][0][0].startswith( + 'https://outlook.office.com/webhook/') + + # Our Posted JSON Object + posted_json = json.loads(mock_post.call_args_list[0][1]['data']) + assert 'summary' in posted_json + assert posted_json['summary'] == 'Apprise Notifications' + assert posted_json['themeColor'] == '#3AA3E3' + assert posted_json['sections'][0]['activityTitle'] == 'title' + assert posted_json['sections'][0]['text'] == 'body' + + # We even parsed our entry out of the URL + assert posted_json['potentialAction'][0]['actions'][0]['target'] \ + == 'http://localhost' + + +@mock.patch('requests.post') +def test_msteams_yaml_config(mock_post, tmpdir): + """ + API: NotifyMSTeams() YAML Configuration Entries + + """ + + # Disable Throttling to speed testing + plugins.NotifyBase.request_rate_per_sec = 0 + + # Prepare Mock + mock_post.return_value = requests.Request() + mock_post.return_value.status_code = requests.codes.ok + + uuid4 = '8b799edf-6f98-4d3a-9be7-2862fb4e5752' + url = 'msteams://{}@{}/{}/{}'.format(uuid4, uuid4, 'a' * 32, uuid4) + + # Test cases where our URL is invalid + template = tmpdir.join("simple.json") + template.write(""" + { + "@type": "MessageCard", + "@context": "https://schema.org/extensions", + "summary": "{{name}}", + "themeColor": "{{app_color}}", + "sections": [ + { + "activityImage": null, + "activityTitle": "{{title}}", + "text": "{{body}}" + } + ] + } + """) + + # Test Invalid Filename + config = tmpdir.join("msteams01.yml") + config.write(""" + urls: + - {url}: + - tag: 'msteams' + template: {template}.missing + :name: 'Template.Missing' + :body: 'test body' + :title: 'test title' + """.format(url=url, template=str(template))) + + cfg = AppriseConfig() + cfg.add(str(config)) + assert len(cfg) == 1 + assert len(cfg[0]) == 1 + + obj = cfg[0][0] + assert isinstance(obj, plugins.NotifyMSTeams) + assert obj.notify( + body="body", title='title', + notify_type=NotifyType.INFO) is False + assert mock_post.called is False + + # Test token identifiers + config = tmpdir.join("msteams01.yml") + config.write(""" + urls: + - {url}: + - tag: 'msteams' + template: {template} + :name: 'Testing' + :body: 'test body' + :title: 'test title' + """.format(url=url, template=str(template))) + + cfg = AppriseConfig() + cfg.add(str(config)) + assert len(cfg) == 1 + assert len(cfg[0]) == 1 + + obj = cfg[0][0] + assert isinstance(obj, plugins.NotifyMSTeams) + assert obj.notify( + body="body", title='title', + notify_type=NotifyType.INFO) is True + + assert mock_post.called is True + assert mock_post.call_args_list[0][0][0].startswith( + 'https://outlook.office.com/webhook/') + + # Our Posted JSON Object + posted_json = json.loads(mock_post.call_args_list[0][1]['data']) + assert 'summary' in posted_json + assert posted_json['summary'] == 'Testing' + assert posted_json['themeColor'] == '#3AA3E3' + assert posted_json['sections'][0]['activityTitle'] == 'test title' + assert posted_json['sections'][0]['text'] == 'test body' + + # + # Now again but without a bullet under the url definition + # + mock_post.reset_mock() + config = tmpdir.join("msteams02.yml") + config.write(""" + urls: + - {url}: + tag: 'msteams' + template: {template} + :name: 'Testing2' + :body: 'test body2' + :title: 'test title2' + """.format(url=url, template=str(template))) + + cfg = AppriseConfig() + cfg.add(str(config)) + assert len(cfg) == 1 + assert len(cfg[0]) == 1 + + obj = cfg[0][0] + assert isinstance(obj, plugins.NotifyMSTeams) + assert obj.notify( + body="body", title='title', + notify_type=NotifyType.INFO) is True + + assert mock_post.called is True + assert mock_post.call_args_list[0][0][0].startswith( + 'https://outlook.office.com/webhook/') + + # Our Posted JSON Object + posted_json = json.loads(mock_post.call_args_list[0][1]['data']) + assert 'summary' in posted_json + assert posted_json['summary'] == 'Testing2' + assert posted_json['themeColor'] == '#3AA3E3' + assert posted_json['sections'][0]['activityTitle'] == 'test title2' + assert posted_json['sections'][0]['text'] == 'test body2' + + # + # Try again but store the content as a dictionary in the cofiguration file + # + mock_post.reset_mock() + config = tmpdir.join("msteams03.yml") + config.write(""" + urls: + - {url}: + - tag: 'msteams' + template: {template} + tokens: + name: 'Testing3' + body: 'test body3' + title: 'test title3' + """.format(url=url, template=str(template))) + + cfg = AppriseConfig() + cfg.add(str(config)) + assert len(cfg) == 1 + assert len(cfg[0]) == 1 + + obj = cfg[0][0] + assert isinstance(obj, plugins.NotifyMSTeams) + assert obj.notify( + body="body", title='title', + notify_type=NotifyType.INFO) is True + + assert mock_post.called is True + assert mock_post.call_args_list[0][0][0].startswith( + 'https://outlook.office.com/webhook/') + + # Our Posted JSON Object + posted_json = json.loads(mock_post.call_args_list[0][1]['data']) + assert 'summary' in posted_json + assert posted_json['summary'] == 'Testing3' + assert posted_json['themeColor'] == '#3AA3E3' + assert posted_json['sections'][0]['activityTitle'] == 'test title3' + assert posted_json['sections'][0]['text'] == 'test body3' + + # + # Now again but without a bullet under the url definition + # + mock_post.reset_mock() + config = tmpdir.join("msteams04.yml") + config.write(""" + urls: + - {url}: + tag: 'msteams' + template: {template} + tokens: + name: 'Testing4' + body: 'test body4' + title: 'test title4' + """.format(url=url, template=str(template))) + + cfg = AppriseConfig() + cfg.add(str(config)) + assert len(cfg) == 1 + assert len(cfg[0]) == 1 + + obj = cfg[0][0] + assert isinstance(obj, plugins.NotifyMSTeams) + assert obj.notify( + body="body", title='title', + notify_type=NotifyType.INFO) is True + + assert mock_post.called is True + assert mock_post.call_args_list[0][0][0].startswith( + 'https://outlook.office.com/webhook/') + + # Our Posted JSON Object + posted_json = json.loads(mock_post.call_args_list[0][1]['data']) + assert 'summary' in posted_json + assert posted_json['summary'] == 'Testing4' + assert posted_json['themeColor'] == '#3AA3E3' + assert posted_json['sections'][0]['activityTitle'] == 'test title4' + assert posted_json['sections'][0]['text'] == 'test body4' + + # Now let's do a combination of the two + mock_post.reset_mock() + config = tmpdir.join("msteams05.yml") + config.write(""" + urls: + - {url}: + - tag: 'msteams' + template: {template} + tokens: + body: 'test body5' + title: 'test title5' + :name: 'Testing5' + """.format(url=url, template=str(template))) + + cfg = AppriseConfig() + cfg.add(str(config)) + assert len(cfg) == 1 + assert len(cfg[0]) == 1 + + obj = cfg[0][0] + assert isinstance(obj, plugins.NotifyMSTeams) + assert obj.notify( + body="body", title='title', + notify_type=NotifyType.INFO) is True + + assert mock_post.called is True + assert mock_post.call_args_list[0][0][0].startswith( + 'https://outlook.office.com/webhook/') + + # Our Posted JSON Object + posted_json = json.loads(mock_post.call_args_list[0][1]['data']) + assert 'summary' in posted_json + assert posted_json['summary'] == 'Testing5' + assert posted_json['themeColor'] == '#3AA3E3' + assert posted_json['sections'][0]['activityTitle'] == 'test title5' + assert posted_json['sections'][0]['text'] == 'test body5' + + # Now let's do a test where our tokens is not the expected + # dictionary we want to see + mock_post.reset_mock() + config = tmpdir.join("msteams06.yml") + config.write(""" + urls: + - {url}: + - tag: 'msteams' + template: {template} + # Not a dictionary + tokens: + body + """.format(url=url, template=str(template))) + + cfg = AppriseConfig() + cfg.add(str(config)) + assert len(cfg) == 1 + + # It could not load because of invalid tokens + assert len(cfg[0]) == 0 + + +def test_notify_msteams_plugin(): + """ + API: NotifyMSTeams() Extra Checks + + """ + # Initializes the plugin with an invalid token + with pytest.raises(TypeError): + plugins.NotifyMSTeams(token_a=None, token_b='abcd', token_c='abcd') + # Whitespace also acts as an invalid token value + with pytest.raises(TypeError): + plugins.NotifyMSTeams(token_a=' ', token_b='abcd', token_c='abcd') + + with pytest.raises(TypeError): + plugins.NotifyMSTeams(token_a='abcd', token_b=None, token_c='abcd') + # Whitespace also acts as an invalid token value + with pytest.raises(TypeError): + plugins.NotifyMSTeams(token_a='abcd', token_b=' ', token_c='abcd') + + with pytest.raises(TypeError): + plugins.NotifyMSTeams(token_a='abcd', token_b='abcd', token_c=None) + # Whitespace also acts as an invalid token value + with pytest.raises(TypeError): + plugins.NotifyMSTeams(token_a='abcd', token_b='abcd', token_c=' ') + + uuid4 = '8b799edf-6f98-4d3a-9be7-2862fb4e5752' + token_a = '{}@{}'.format(uuid4, uuid4) + token_b = 'A' * 32 + # test case where no tokens are specified + obj = plugins.NotifyMSTeams( + token_a=token_a, token_b=token_b, token_c=uuid4) + assert isinstance(obj, plugins.NotifyMSTeams) diff --git a/test/test_rest_plugins.py b/test/test_rest_plugins.py index 0fce6dbb..0c2e1705 100644 --- a/test/test_rest_plugins.py +++ b/test/test_rest_plugins.py @@ -5256,31 +5256,6 @@ def test_notify_msg91_plugin(mock_post): plugins.NotifyMSG91(authkey=" ", targets=target) -def test_notify_msteams_plugin(): - """ - API: NotifyMSTeams() Extra Checks - - """ - # Initializes the plugin with an invalid token - with pytest.raises(TypeError): - plugins.NotifyMSTeams(token_a=None, token_b='abcd', token_c='abcd') - # Whitespace also acts as an invalid token value - with pytest.raises(TypeError): - plugins.NotifyMSTeams(token_a=' ', token_b='abcd', token_c='abcd') - - with pytest.raises(TypeError): - plugins.NotifyMSTeams(token_a='abcd', token_b=None, token_c='abcd') - # Whitespace also acts as an invalid token value - with pytest.raises(TypeError): - plugins.NotifyMSTeams(token_a='abcd', token_b=' ', token_c='abcd') - - with pytest.raises(TypeError): - plugins.NotifyMSTeams(token_a='abcd', token_b='abcd', token_c=None) - # Whitespace also acts as an invalid token value - with pytest.raises(TypeError): - plugins.NotifyMSTeams(token_a='abcd', token_b='abcd', token_c=' ') - - def test_notify_prowl_plugin(): """ API: NotifyProwl() Extra Checks diff --git a/test/test_utils.py b/test/test_utils.py index 234414f8..b187da0d 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -26,6 +26,7 @@ from __future__ import print_function import re import os +import six try: # Python 2.7 from urllib import unquote @@ -46,10 +47,11 @@ def test_parse_qsd(): result = utils.parse_qsd('a=1&b=&c&d=abcd') assert isinstance(result, dict) is True - assert len(result) == 3 + assert len(result) == 4 assert 'qsd' in result assert 'qsd+' in result assert 'qsd-' in result + assert 'qsd:' in result assert len(result['qsd']) == 4 assert 'a' in result['qsd'] @@ -59,6 +61,7 @@ def test_parse_qsd(): assert len(result['qsd-']) == 0 assert len(result['qsd+']) == 0 + assert len(result['qsd:']) == 0 def test_parse_url(): @@ -77,6 +80,7 @@ def test_parse_url(): assert result['qsd'] == {} assert result['qsd-'] == {} assert result['qsd+'] == {} + assert result['qsd:'] == {} result = utils.parse_url('http://hostname/') assert result['schema'] == 'http' @@ -91,6 +95,7 @@ def test_parse_url(): assert result['qsd'] == {} assert result['qsd-'] == {} assert result['qsd+'] == {} + assert result['qsd:'] == {} # colon after hostname without port number is no good assert utils.parse_url('http://hostname:') is None @@ -109,6 +114,7 @@ def test_parse_url(): assert result['qsd'] == {} assert result['qsd-'] == {} assert result['qsd+'] == {} + assert result['qsd:'] == {} # A port of Zero is not valid assert utils.parse_url('http://hostname:0') is None @@ -127,6 +133,7 @@ def test_parse_url(): assert result['qsd'] == {} assert result['qsd-'] == {} assert result['qsd+'] == {} + assert result['qsd:'] == {} result = utils.parse_url('http://hostname/?-KeY=Value') assert result['schema'] == 'http' @@ -143,6 +150,7 @@ def test_parse_url(): assert 'KeY' in result['qsd-'] assert unquote(result['qsd-']['KeY']) == 'Value' assert result['qsd+'] == {} + assert result['qsd:'] == {} result = utils.parse_url('http://hostname/?+KeY=Value') assert result['schema'] == 'http' @@ -158,9 +166,26 @@ def test_parse_url(): assert 'KeY' in result['qsd+'] assert result['qsd+']['KeY'] == 'Value' assert result['qsd-'] == {} + assert result['qsd:'] == {} + + result = utils.parse_url('http://hostname/?:kEy=vALUE') + assert result['schema'] == 'http' + assert result['host'] == 'hostname' + assert result['port'] is None + assert result['user'] is None + assert result['password'] is None + assert result['fullpath'] == '/' + assert result['path'] == '/' + assert result['query'] is None + assert result['url'] == 'http://hostname/' + assert ':key' in result['qsd'] + assert 'kEy' in result['qsd:'] + assert result['qsd:']['kEy'] == 'vALUE' + assert result['qsd+'] == {} + assert result['qsd-'] == {} result = utils.parse_url( - 'http://hostname/?+KeY=ValueA&-kEy=ValueB&KEY=Value%20+C') + 'http://hostname/?+KeY=ValueA&-kEy=ValueB&KEY=Value%20+C&:colon=y') assert result['schema'] == 'http' assert result['host'] == 'hostname' assert result['port'] is None @@ -172,6 +197,8 @@ def test_parse_url(): assert result['url'] == 'http://hostname/' assert '+key' in result['qsd'] assert '-key' in result['qsd'] + assert ':colon' in result['qsd'] + assert result['qsd:']['colon'] == 'y' assert 'key' in result['qsd'] assert 'KeY' in result['qsd+'] assert result['qsd+']['KeY'] == 'ValueA' @@ -194,6 +221,7 @@ def test_parse_url(): assert result['qsd'] == {} assert result['qsd-'] == {} assert result['qsd+'] == {} + assert result['qsd:'] == {} result = utils.parse_url('http://hostname:40////') assert result['schema'] == 'http' @@ -208,6 +236,7 @@ def test_parse_url(): assert result['qsd'] == {} assert result['qsd-'] == {} assert result['qsd+'] == {} + assert result['qsd:'] == {} result = utils.parse_url('HTTP://HoStNaMe:40/test.php') assert result['schema'] == 'http' @@ -222,6 +251,7 @@ def test_parse_url(): assert result['qsd'] == {} assert result['qsd-'] == {} assert result['qsd+'] == {} + assert result['qsd:'] == {} result = utils.parse_url('HTTPS://user@hostname/test.py') assert result['schema'] == 'https' @@ -236,6 +266,7 @@ def test_parse_url(): assert result['qsd'] == {} assert result['qsd-'] == {} assert result['qsd+'] == {} + assert result['qsd:'] == {} result = utils.parse_url(' HTTPS://///user@@@hostname///test.py ') assert result['schema'] == 'https' @@ -250,6 +281,7 @@ def test_parse_url(): assert result['qsd'] == {} assert result['qsd-'] == {} assert result['qsd+'] == {} + assert result['qsd:'] == {} result = utils.parse_url( 'HTTPS://user:password@otherHost/full///path/name/', @@ -266,6 +298,7 @@ def test_parse_url(): assert result['qsd'] == {} assert result['qsd-'] == {} assert result['qsd+'] == {} + assert result['qsd:'] == {} # Handle garbage assert utils.parse_url(None) is None @@ -293,6 +326,7 @@ def test_parse_url(): assert unquote(result['qsd']['format']) == 'text' assert result['qsd-'] == {} assert result['qsd+'] == {} + assert result['qsd:'] == {} # Test Passwords with question marks ?; not supported result = utils.parse_url( @@ -316,6 +350,7 @@ def test_parse_url(): assert result['qsd'] == {} assert result['qsd-'] == {} assert result['qsd+'] == {} + assert result['qsd:'] == {} # just host and path result = utils.parse_url('invalid/host') @@ -331,6 +366,7 @@ def test_parse_url(): assert result['qsd'] == {} assert result['qsd-'] == {} assert result['qsd+'] == {} + assert result['qsd:'] == {} # just all out invalid assert utils.parse_url('?') is None @@ -362,6 +398,7 @@ def test_parse_url(): assert result['qsd'] == {} assert result['qsd-'] == {} assert result['qsd+'] == {} + assert result['qsd:'] == {} result = utils.parse_url('testhostname') assert result['schema'] == 'http' @@ -376,6 +413,8 @@ def test_parse_url(): assert result['url'] == 'http://testhostname' assert result['qsd'] == {} assert result['qsd-'] == {} + assert result['qsd+'] == {} + assert result['qsd:'] == {} result = utils.parse_url('example.com', default_schema='unknown') assert result['schema'] == 'unknown' @@ -390,6 +429,8 @@ def test_parse_url(): assert result['url'] == 'unknown://example.com' assert result['qsd'] == {} assert result['qsd-'] == {} + assert result['qsd+'] == {} + assert result['qsd:'] == {} # An empty string without a hostame is still valid if verify_host is set result = utils.parse_url('', verify_host=False) @@ -405,6 +446,8 @@ def test_parse_url(): assert result['url'] == 'http://' assert result['qsd'] == {} assert result['qsd-'] == {} + assert result['qsd+'] == {} + assert result['qsd:'] == {} # A messed up URL result = utils.parse_url('test://:@/', verify_host=False) @@ -419,6 +462,8 @@ def test_parse_url(): assert result['url'] == 'test://:@/' assert result['qsd'] == {} assert result['qsd-'] == {} + assert result['qsd+'] == {} + assert result['qsd:'] == {} result = utils.parse_url('crazy://:@//_/@^&/jack.json', verify_host=False) assert result['schema'] == 'crazy' @@ -432,6 +477,8 @@ def test_parse_url(): assert unquote(result['url']) == 'crazy://:@/_/@^&/jack.json' assert result['qsd'] == {} assert result['qsd-'] == {} + assert result['qsd+'] == {} + assert result['qsd:'] == {} def test_parse_bool(): @@ -1159,3 +1206,61 @@ def test_environ_temporary_change(): # Even our temporary variables are now missing assert n_key not in os.environ assert d_key not in os.environ + + +def test_apply_templating(): + """utils: apply_template() testing + """ + + template = "Hello {{fname}}, How are you {{whence}}?" + + result = utils.apply_template( + template, **{'fname': 'Chris', 'whence': 'this morning'}) + assert isinstance(result, six.string_types) is True + assert result == "Hello Chris, How are you this morning?" + + # In this example 'whence' isn't provided, so it isn't swapped + result = utils.apply_template( + template, **{'fname': 'Chris'}) + assert isinstance(result, six.string_types) is True + assert result == "Hello Chris, How are you {{whence}}?" + + # white space won't cause any ill affects: + template = "Hello {{ fname }}, How are you {{ whence}}?" + result = utils.apply_template( + template, **{'fname': 'Chris', 'whence': 'this morning'}) + assert isinstance(result, six.string_types) is True + assert result == "Hello Chris, How are you this morning?" + + # No arguments won't cause any problems + template = "Hello {{fname}}, How are you {{whence}}?" + result = utils.apply_template(template) + assert isinstance(result, six.string_types) is True + assert result == template + + # Wrong elements are simply ignored + result = utils.apply_template( + template, + **{'fname': 'l2g', 'whence': 'this evening', 'ignore': 'me'}) + assert isinstance(result, six.string_types) is True + assert result == "Hello l2g, How are you this evening?" + + # Empty template makes things easy + result = utils.apply_template( + "", **{'fname': 'l2g', 'whence': 'this evening'}) + assert isinstance(result, six.string_types) is True + assert result == "" + + # Regular expressions are safely escapped and act as normal + # tokens: + template = "Hello {{.*}}, How are you {{[A-Z0-9]+}}?" + result = utils.apply_template( + template, **{'.*': 'l2g', '[A-Z0-9]+': 'this afternoon'}) + assert result == "Hello l2g, How are you this afternoon?" + + # JSON is handled too such as escaping quotes + template = '{value: "{{ value }}"}' + result = utils.apply_template( + template, app_mode=utils.TemplateType.JSON, + **{'value': '"quotes are escaped"'}) + assert result == '{value: "\\"quotes are escaped\\""}'