# -*- coding: utf-8 -*- # # Copyright (C) 2019 Chris Caron # All rights reserved. # # This code is licensed under the MIT License. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files(the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and / or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions : # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from django.shortcuts import render from django.http import HttpResponse from django.http import JsonResponse from django.views import View from django.conf import settings from django.utils.html import escape from django.utils.decorators import method_decorator from django.views.decorators.cache import never_cache from django.views.decorators.gzip import gzip_page from django.utils.translation import gettext_lazy as _ from django.core.serializers.json import DjangoJSONEncoder from .utils import parse_attachments from .utils import ConfigCache from .utils import apply_global_filters from .utils import send_webhook from .forms import AddByUrlForm from .forms import AddByConfigForm from .forms import NotifyForm from .forms import NotifyByUrlForm from .forms import CONFIG_FORMATS from .forms import AUTO_DETECT_CONFIG_KEYWORD import logging import apprise import json import re # Get an instance of a logger logger = logging.getLogger('django') # Content-Type Parsing # application/x-www-form-urlencoded # application/x-www-form-urlencoded # multipart/form-data MIME_IS_FORM = re.compile( r'(multipart|application)/(x-www-)?form-(data|urlencoded)', re.I) # Support JSON formats # text/json # text/x-json # application/json # application/x-json MIME_IS_JSON = re.compile( r'(text|application)/(x-)?json', re.I) # Tags separated by space , &, or + are and'ed together # Tags separated by commas (even commas wrapped in spaces) are "or'ed" together # We start with a regular expression used to clean up provided tags TAG_VALIDATION_RE = re.compile(r'^\s*[a-z0-9\s| ,_-]+\s*$', re.IGNORECASE) # In order to separate our tags only by comma's or '|' entries found TAG_DETECT_RE = re.compile( r'\s*([a-z0-9\s_&+-]+)(?=$|\s*[|,]\s*[a-z0-9\s&+_-|,])', re.I) # Break apart our objects anded together TAG_AND_DELIM_RE = re.compile(r'[\s&+]+') MIME_IS_JSON = re.compile( r'(text|application)/(x-)?json', re.I) class JSONEncoder(DjangoJSONEncoder): """ A wrapper to the DjangoJSONEncoder to support sets() (converting them to lists). """ def default(self, obj): if isinstance(obj, set): return list(obj) elif isinstance(obj, apprise.AppriseLocale.LazyTranslation): return str(obj) return super().default(obj) class ResponseCode(object): """ These codes are based on those provided by the requests object """ okay = 200 no_content = 204 bad_request = 400 no_access = 403 not_found = 404 method_not_allowed = 405 method_not_accepted = 406 failed_dependency = 424 internal_server_error = 500 class WelcomeView(View): """ A simple welcome/index page """ template_name = 'welcome.html' def get(self, request): default_key = 'KEY' key = request.GET.get('key', default_key).strip() return render(request, self.template_name, { 'secure': request.scheme[-1].lower() == 's', 'key': key if key else default_key, }) @method_decorator((gzip_page, never_cache), name='dispatch') class DetailsView(View): """ A Django view used to list all supported endpoints """ template_name = 'details.html' def get(self, request): """ Handle a GET request """ # Detect the format our response should be in json_response = \ MIME_IS_JSON.match( request.content_type if request.content_type else request.headers.get( 'accept', request.headers.get( 'content-type', ''))) is not None # Show All flag # Support 'yes', '1', 'true', 'enable', 'active', and + show_all = request.GET.get('all', 'no')[0].lower() in ( 'a', 'y', '1', 't', 'e', '+') # Our status status = ResponseCode.okay # # Apply Any Global Filters (if identified) # apply_global_filters() # Create an Apprise Object a_obj = apprise.Apprise() # Load our details details = a_obj.details(show_disabled=show_all) # Sort our result set details['schemas'] = sorted( details['schemas'], key=lambda i: str(i['service_name']).upper()) # Return our content return render(request, self.template_name, { 'show_all': show_all, 'details': details, }, status=status) if not json_response else \ JsonResponse( details, encoder=JSONEncoder, safe=False, status=status) @method_decorator(never_cache, name='dispatch') class ConfigView(View): """ A Django view used to manage configuration """ template_name = 'config.html' def get(self, request, key): """ Handle a GET request """ return render(request, self.template_name, { 'key': key, 'form_url': AddByUrlForm(), 'form_cfg': AddByConfigForm(), 'form_notify': NotifyForm(), }) @method_decorator(never_cache, name='dispatch') class AddView(View): """ A Django view used to store Apprise configuration """ def post(self, request, key): """ Handle a POST request """ # Detect the format our response should be in json_response = \ MIME_IS_JSON.match( request.content_type if request.content_type else request.headers.get( 'accept', request.headers.get( 'content-type', ''))) is not None if settings.APPRISE_CONFIG_LOCK: # General Access Control msg = _('The site has been configured to deny this request.') status = ResponseCode.no_access return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) # our content content = {} if MIME_IS_FORM.match(request.content_type): content = {} form = AddByConfigForm(request.POST) if form.is_valid(): content.update(form.cleaned_data) form = AddByUrlForm(request.POST) if form.is_valid(): content.update(form.cleaned_data) elif json_response: # Prepare our default response try: # load our JSON content content = json.loads(request.body.decode('utf-8')) except (AttributeError, ValueError): # could not parse JSON response... return JsonResponse({ 'error': _('Invalid JSON specified.'), }, encoder=JSONEncoder, safe=False, status=ResponseCode.bad_request, ) if not content: # No information was posted msg = _('The message format is not supported.') status = ResponseCode.bad_request return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) # Create ourselves an apprise object to work with a_obj = apprise.Apprise() if 'urls' in content: # Load our content a_obj.add(content['urls']) if not len(a_obj): # No URLs were loaded msg = _('No valid URLs were found.') status = ResponseCode.bad_request return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) if not ConfigCache.put( key, '\r\n'.join([s.url() for s in a_obj]), apprise.ConfigFormat.TEXT): msg = _('The configuration could not be saved.') status = ResponseCode.internal_server_error return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) elif 'config' in content: fmt = content.get('format', '').lower() if fmt not in [i[0] for i in CONFIG_FORMATS]: # Format must be one supported by apprise msg = _('The format specified is invalid.') status = ResponseCode.bad_request return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) # prepare our apprise config object ac_obj = apprise.AppriseConfig() if fmt == AUTO_DETECT_CONFIG_KEYWORD: # By setting format to None, it is automatically detected from # within the add_config() call fmt = None # Load our configuration if not ac_obj.add_config(content['config'], format=fmt): # The format could not be detected msg = _('The configuration format could not be detected.') status = ResponseCode.bad_request return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) # Add our configuration a_obj.add(ac_obj) if not len(a_obj): # No specified URL(s) were loaded due to # mis-configuration on the caller's part msg = _('No valid URL(s) were specified.') status = ResponseCode.bad_request return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) if not ConfigCache.put( key, content['config'], fmt=ac_obj[0].config_format): # Something went very wrong; return 500 msg = _('An error occured saving configuration.') status = ResponseCode.internal_server_error return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) else: # No configuration specified; we're done msg = _('No configuration specified.') status = ResponseCode.bad_request return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) # If we reach here; we successfully loaded the configuration so we can # go ahead and write it to disk and alert our caller of the success. return HttpResponse( _('Successfully saved configuration.'), status=ResponseCode.okay, ) @method_decorator(never_cache, name='dispatch') class DelView(View): """ A Django view for removing content associated with a key """ def post(self, request, key): """ Handle a POST request """ # Detect the format our response should be in json_response = \ MIME_IS_JSON.match( request.content_type if request.content_type else request.headers.get( 'accept', request.headers.get( 'content-type', ''))) is not None if settings.APPRISE_CONFIG_LOCK: # General Access Control msg = _('The site has been configured to deny this request.') status = ResponseCode.no_access return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) # Clear the key result = ConfigCache.clear(key) if result is None: msg = _('There was no configuration to remove.') status = ResponseCode.no_content return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) elif result is False: # There was a failure at the os level msg = _('The configuration could not be removed.') status = ResponseCode.internal_server_error return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) # Removed content return HttpResponse( _('Successfully removed configuration.'), status=ResponseCode.okay, ) @method_decorator((gzip_page, never_cache), name='dispatch') class GetView(View): """ A Django view used to retrieve previously stored Apprise configuration """ def post(self, request, key): """ Handle a POST request """ # Detect the format our response should be in json_response = \ MIME_IS_JSON.match( request.content_type if request.content_type else request.headers.get( 'accept', request.headers.get( 'content-type', ''))) is not None if settings.APPRISE_CONFIG_LOCK: # General Access Control msg = _('The site has been configured to deny this request.') status = ResponseCode.no_access return HttpResponse(msg, status=status) \ if not json_response else JsonResponse( {'error': msg}, encoder=JSONEncoder, safe=False, status=status) config, format = ConfigCache.get(key) if config is None: # The returned value of config and format tell a rather cryptic # story; this portion could probably be updated in the future. # but for now it reads like this: # config == None and format == None: We had an internal error # config == None and format != None: we simply have no data # config != None: we simply have no data if format is not None: # no content to return msg = _('There was no configuration found.') status = ResponseCode.no_content return HttpResponse(msg, status=status) \ if not json_response else JsonResponse( {'error': msg}, encoder=JSONEncoder, safe=False, status=status) # Something went very wrong; return 500 msg = _('An error occured accessing configuration.') status = ResponseCode.internal_server_error return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) # Our configuration was retrieved; now our response varies on whether # we are a YAML configuration or a TEXT based one. This allows us to # be compatible with those using the AppriseConfig() library or the # reference to it through the --config (-c) option in the CLI. content_type = 'text/yaml; charset=utf-8' \ if format == apprise.ConfigFormat.YAML \ else 'text/html; charset=utf-8' # Return our retrieved content return HttpResponse( config, content_type=content_type, status=ResponseCode.okay, ) if not json_response else JsonResponse({ 'format': format, 'config': config}, encoder=JSONEncoder, safe=False, status=ResponseCode.okay, ) @method_decorator((gzip_page, never_cache), name='dispatch') class NotifyView(View): """ A Django view for sending a notification in a stateful manner """ def post(self, request, key): """ Handle a POST request """ # Detect the format our response should be in json_response = \ MIME_IS_JSON.match( request.content_type if request.content_type else request.headers.get( 'accept', request.headers.get( 'content-type', ''))) is not None # our content content = {} if MIME_IS_FORM.match(request.content_type): form = NotifyForm(data=request.POST, files=request.FILES) if form.is_valid(): content.update(form.cleaned_data) elif json_response: # Prepare our default response try: # load our JSON content content = json.loads(request.body.decode('utf-8')) except (AttributeError, ValueError): # could not parse JSON response... logger.warning( 'NOTIFY - %s - Invalid JSON Payload provided', request.META['REMOTE_ADDR']) return JsonResponse( _('Invalid JSON provided.'), encoder=JSONEncoder, safe=False, status=ResponseCode.bad_request) if not content: # We could not handle the Content-Type logger.warning( 'NOTIFY - %s - Invalid FORM Payload provided', request.META['REMOTE_ADDR']) msg = _('Bad FORM Payload provided.') status = ResponseCode.bad_request return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status ) # Handle Attachments attach = None if not content.get('attachment') and 'attachment' in request.POST: # Acquire attachments to work with them content['attachment'] = request.POST.getlist('attachment') if 'attachment' in content or request.FILES: try: attach = parse_attachments( content.get('attachment'), request.FILES) except (TypeError, ValueError): # Invalid entry found in list logger.warning( 'NOTIFY - %s - Bad attachment specified', request.META['REMOTE_ADDR']) return HttpResponse( _('Bad attachment'), status=ResponseCode.bad_request) # # Allow 'tag' value to be specified as part of the URL parameters # if not found otherwise defined. # tag = content.get('tag', content.get('tags')) if not tag: # Allow GET parameter over-rides if 'tag' in request.GET: tag = request.GET['tag'] elif 'tags' in request.GET: tag = request.GET['tags'] # Validation - Tag Logic: # "TagA" : TagA # "TagA, TagB" : TagA OR TagB # "TagA TagB" : TagA AND TagB # "TagA TagC, TagB" : (TagA AND TagC) OR TagB # ['TagA', 'TagB'] : TagA OR TagB # [('TagA', 'TagC'), 'TagB'] : (TagA AND TagC) OR TagB # [('TagB', 'TagC')] : TagB AND TagC if tag: if isinstance(tag, (list, set, tuple)): # Assign our tags as they were provided content['tag'] = tag elif isinstance(tag, str): if not TAG_VALIDATION_RE.match(tag): # Invalid entry found in list logger.warning( 'NOTIFY - %s - Ignored invalid tag specified ' '(type %s): %s', request.META['REMOTE_ADDR'], str(type(tag)), str(tag)[:12]) msg = _('Unsupported characters found in tag definition.') status = ResponseCode.bad_request return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) # If we get here, our specified tag was valid tags = [] for _tag in TAG_DETECT_RE.findall(tag): tag = _tag.strip() if not tag: continue # Disect our results group = TAG_AND_DELIM_RE.split(tag) if len(group) > 1: tags.append(tuple(group)) else: tags.append(tag) # Assign our tags content['tag'] = tags else: # Could be int, float or some other unsupported type logger.warning( 'NOTIFY - %s - Ignored invalid tag specified (type %s): ' '%s', request.META['REMOTE_ADDR'], str(type(tag)), str(tag)[:12]) msg = _('Unsupported characters found in tag definition.') status = ResponseCode.bad_request return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) # # Allow 'format' value to be specified as part of the URL # parameters if not found otherwise defined. # if not content.get('format') and 'format' in request.GET: content['format'] = request.GET['format'] # # Allow 'type' value to be specified as part of the URL parameters # if not found otherwise defined. # if not content.get('type') and 'type' in request.GET: content['type'] = request.GET['type'] # # Allow 'title' value to be specified as part of the URL parameters # if not found otherwise defined. # if not content.get('title') and 'title' in request.GET: content['title'] = request.GET['title'] # Some basic error checking if not content.get('body') and not attach or \ content.get('type', apprise.NotifyType.INFO) \ not in apprise.NOTIFY_TYPES: logger.warning( 'NOTIFY - %s - Payload lacks minimum requirements', request.META['REMOTE_ADDR']) return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': _('Payload lacks minimum requirements.'), }, encoder=JSONEncoder, safe=False, status=ResponseCode.bad_request, ) # Acquire our body format (if identified) body_format = content.get('format', apprise.NotifyFormat.TEXT) if body_format and body_format not in apprise.NOTIFY_FORMATS: logger.warning( 'NOTIFY - %s - Format parameter contains an unsupported ' 'value (%s)', request.META['REMOTE_ADDR'], str(body_format)) msg = _('An invalid body input format was specified.') status = ResponseCode.bad_request return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) # If we get here, we have enough information to generate a notification # with. config, format = ConfigCache.get(key) if config is None: # The returned value of config and format tell a rather cryptic # story; this portion could probably be updated in the future. # but for now it reads like this: # config == None and format == None: We had an internal error # config == None and format != None: we simply have no data # config != None: we simply have no data if format is not None: # no content to return logger.debug( 'NOTIFY - %s - Empty configuration found using KEY: %s', request.META['REMOTE_ADDR'], key) msg = _('There was no configuration found.') status = ResponseCode.no_content return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) # Something went very wrong; return 500 msg = _('An error occured accessing configuration.') status = ResponseCode.internal_server_error logger.error( 'NOTIFY - %s - I/O error accessing configuration ' 'using KEY: %s', request.META['REMOTE_ADDR'], key) return HttpResponse(msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) # Prepare our keyword arguments (to be passed into an AppriseAsset # object) kwargs = { 'plugin_paths': settings.APPRISE_PLUGIN_PATHS, } if body_format: # Store our defined body format kwargs['body_format'] = body_format # Acquire our recursion count (if defined) recursion = request.headers.get('X-Apprise-Recursion-Count', 0) try: recursion = int(recursion) if recursion < 0: # We do not accept negative numbers raise TypeError("Invalid Recursion Value") if recursion > settings.APPRISE_RECURSION_MAX: logger.warning( 'NOTIFY - %s - Recursion limit reached (%d > %d)', request.META['REMOTE_ADDR'], recursion, settings.APPRISE_RECURSION_MAX) return HttpResponse( _('The recursion limit has been reached.'), status=ResponseCode.method_not_accepted) # Store our recursion value for our AppriseAsset() initialization kwargs['_recursion'] = recursion except (TypeError, ValueError): logger.warning( 'NOTIFY - %s - Invalid recursion value (%s) provided', request.META['REMOTE_ADDR'], str(recursion)) return HttpResponse( _('An invalid recursion value was specified.'), status=ResponseCode.bad_request) # Acquire our unique identifier (if defined) uid = request.headers.get('X-Apprise-ID', '').strip() if uid: kwargs['_uid'] = uid # # Apply Any Global Filters (if identified) # apply_global_filters() # Prepare ourselves a default Asset asset = apprise.AppriseAsset(**kwargs) # Prepare our apprise object a_obj = apprise.Apprise(asset=asset) # Create an apprise config object ac_obj = apprise.AppriseConfig() # Load our configuration ac_obj.add_config(config, format=format) # Add our configuration a_obj.add(ac_obj) # Our return content type can be controlled by the Accept keyword # If it includes /* or /html somewhere then we return html, otherwise # we return the logs as they're processed in their text format. # The HTML response type has a bit of overhead where as it's not # the case with text/plain content_type = \ 'text/html' if re.search(r'text\/(\*|html)', request.headers.get('Accept', ''), re.IGNORECASE) \ else 'text/plain' # Acquire our log level from headers if defined, otherwise use # the global one set in the settings level = request.headers.get( 'X-Apprise-Log-Level', settings.LOGGING['loggers']['apprise']['level']).upper() if level not in ( 'CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG', 'TRACE'): level = settings.LOGGING['loggers']['apprise']['level'].upper() # Convert level to it's integer value if level == 'CRITICAL': level = logging.CRITICAL elif level == 'ERROR': level = logging.ERROR elif level == 'WARNING': level = logging.WARNING elif level == 'INFO': level = logging.INFO elif level == 'DEBUG': level = logging.DEBUG elif level == 'TRACE': level = logging.DEBUG - 1 # Initialize our response object response = None esc = '' # Format is only updated if the content_type is html fmt = '
  • ' \ '
    %(asctime)s
    ' \ '
    %(levelname)s
    ' \ f'
    {esc}%(message)s{esc}
  • ' \ if content_type == 'text/html' else \ settings.LOGGING['formatters']['standard']['format'] # Now specify our format (and over-ride the default): with apprise.LogCapture(level=level, fmt=fmt) as logs: # Perform our notification at this point result = a_obj.notify( content.get('body'), title=content.get('title', ''), notify_type=content.get('type', apprise.NotifyType.INFO), tag=content.get('tag'), attach=attach, ) if content_type == 'text/html': # Iterate over our entries so that we can prepare to escape # things to be presented as HTML esc = re.escape(esc) entries = re.findall( r'(?P
  • .*?)' r'{}(?P.+li>$)(?=$|
  • ` tag and escape our message body: response = ''.format( ''.join([e[0] + escape(e[1]) + e[2] for e in entries])) else: # content_type == 'text/plain' response = logs.getvalue() if settings.APPRISE_WEBHOOK_URL: webhook_payload = { 'source': request.META['REMOTE_ADDR'], 'status': 0 if result else 1, 'output': response, } # Send our webhook (pass or fail) send_webhook(webhook_payload) if not result: # If at least one notification couldn't be sent; change up # the response to a 424 error code msg = _('One or more notification could not be sent.') status = ResponseCode.failed_dependency logger.warning( 'NOTIFY - %s - One or more notifications not ' 'sent%s using KEY: %s', request.META['REMOTE_ADDR'], '' if not tag else f' (Tags: {tag})', key) return HttpResponse(response if response else msg, status=status) \ if not json_response else JsonResponse({ 'error': msg, }, encoder=JSONEncoder, safe=False, status=status, ) logger.info( 'NOTIFY - %s - Delivered Notification(s) - %sKEY: %s', request.META['REMOTE_ADDR'], '' if not tag else f'Tags: {tag}, ', key) # Return our retrieved content return HttpResponse( response if response is not None else _('Notification(s) sent.'), content_type=content_type, status=ResponseCode.okay, ) @method_decorator((gzip_page, never_cache), name='dispatch') class StatelessNotifyView(View): """ A Django view for sending a stateless notification """ def post(self, request): """ Handle a POST request """ # our content content = {} if MIME_IS_FORM.match(request.content_type): content = {} form = NotifyByUrlForm(request.POST, request.FILES) if form.is_valid(): content.update(form.cleaned_data) elif MIME_IS_JSON.match(request.content_type): # Prepare our default response try: # load our JSON content content = json.loads(request.body.decode('utf-8')) except (AttributeError, ValueError): # could not parse JSON response... logger.warning( 'NOTIFY - %s - Invalid JSON Payload provided', request.META['REMOTE_ADDR']) return HttpResponse( _('Invalid JSON specified.'), status=ResponseCode.bad_request) if not content: # We could not handle the Content-Type logger.warning( 'NOTIFY - %s - Invalid FORM Payload provided', request.META['REMOTE_ADDR']) return HttpResponse( _('Bad FORM Payload provided.'), status=ResponseCode.bad_request) if not content.get('urls') and settings.APPRISE_STATELESS_URLS: # fallback to settings.APPRISE_STATELESS_URLS if no urls were # defined content['urls'] = settings.APPRISE_STATELESS_URLS # Some basic error checking if not content.get('body') or \ content.get('type', apprise.NotifyType.INFO) \ not in apprise.NOTIFY_TYPES: logger.warning( 'NOTIFY - %s - Payload lacks minimum requirements', request.META['REMOTE_ADDR']) return HttpResponse( _('Payload lacks minimum requirements.'), status=ResponseCode.bad_request) # Acquire our body format (if identified) body_format = content.get('format', apprise.NotifyFormat.TEXT) if body_format and body_format not in apprise.NOTIFY_FORMATS: logger.warning( 'NOTIFY - %s - Format parameter contains an unsupported ' 'value (%s)', request.META['REMOTE_ADDR'], str(body_format)) return HttpResponse( _('An invalid body input format was specified.'), status=ResponseCode.bad_request) # Prepare our keyword arguments (to be passed into an AppriseAsset # object) kwargs = { 'plugin_paths': settings.APPRISE_PLUGIN_PATHS, } if body_format: # Store our defined body format kwargs['body_format'] = body_format # Acquire our recursion count (if defined) recursion = request.headers.get('X-Apprise-Recursion-Count', 0) try: recursion = int(recursion) if recursion < 0: # We do not accept negative numbers raise TypeError("Invalid Recursion Value") if recursion > settings.APPRISE_RECURSION_MAX: logger.warning( 'NOTIFY - %s - Recursion limit reached (%d > %d)', request.META['REMOTE_ADDR'], recursion, settings.APPRISE_RECURSION_MAX) return HttpResponse( _('The recursion limit has been reached.'), status=ResponseCode.method_not_accepted) # Store our recursion value for our AppriseAsset() initialization kwargs['_recursion'] = recursion except (TypeError, ValueError): logger.warning( 'NOTIFY - %s - Invalid recursion value (%s) provided', request.META['REMOTE_ADDR'], str(recursion)) return HttpResponse( _('An invalid recursion value was specified.'), status=ResponseCode.bad_request) # Acquire our unique identifier (if defined) uid = request.headers.get('X-Apprise-ID', '').strip() if uid: kwargs['_uid'] = uid # # Apply Any Global Filters (if identified) # apply_global_filters() # Prepare ourselves a default Asset asset = apprise.AppriseAsset(**kwargs) # Prepare our apprise object a_obj = apprise.Apprise(asset=asset) # Add URLs a_obj.add(content.get('urls')) if not len(a_obj): return HttpResponse( _('There was no services to notify.'), status=ResponseCode.no_content, ) # Handle Attachments attach = None if not content.get('attachment') and 'attachment' in request.POST: # Acquire attachments to work with them content['attachment'] = request.POST.getlist('attachment') if 'attachment' in content or request.FILES: try: attach = parse_attachments( content.get('attachment'), request.FILES) except (TypeError, ValueError): return HttpResponse( _('Bad attachment'), status=ResponseCode.bad_request) # Acquire our log level from headers if defined, otherwise use # the global one set in the settings level = request.headers.get( 'X-Apprise-Log-Level', settings.LOGGING['loggers']['apprise']['level']).upper() if level not in ( 'CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG', 'TRACE'): level = settings.LOGGING['loggers']['apprise']['level'].upper() # Convert level to it's integer value if level == 'CRITICAL': level = logging.CRITICAL elif level == 'ERROR': level = logging.ERROR elif level == 'WARNING': level = logging.WARNING elif level == 'INFO': level = logging.INFO elif level == 'DEBUG': level = logging.DEBUG elif level == 'TRACE': level = logging.DEBUG - 1 if settings.APPRISE_WEBHOOK_URL: fmt = settings.LOGGING['formatters']['standard']['format'] with apprise.LogCapture(level=level, fmt=fmt) as logs: # Perform our notification at this point result = a_obj.notify( content.get('body'), title=content.get('title', ''), notify_type=content.get('type', apprise.NotifyType.INFO), tag='all', attach=attach, ) response = logs.getvalue() webhook_payload = { 'source': request.META['REMOTE_ADDR'], 'status': 0 if result else 1, 'output': response, } # Send our webhook (pass or fail) send_webhook(webhook_payload) else: # Perform our notification at this point result = a_obj.notify( content.get('body'), title=content.get('title', ''), notify_type=content.get('type', apprise.NotifyType.INFO), tag='all', attach=attach, ) if not result: # If at least one notification couldn't be sent; change up the # response to a 424 error code return HttpResponse( _('One or more notification could not be sent.'), status=ResponseCode.failed_dependency) logger.info( 'NOTIFY - %s - Delivered Stateless Notification(s)', request.META['REMOTE_ADDR']) # Return our retrieved content return HttpResponse( _('Notification(s) sent.'), status=ResponseCode.okay, ) @method_decorator((gzip_page, never_cache), name='dispatch') class JsonUrlView(View): """ A Django view that lists all loaded tags and URLs for a given key """ def get(self, request, key): """ Handle a POST request """ # Now build our tag response that identifies all of the tags # and the URL's they're associated with # { # "tags": ["tag1', "tag2", "tag3"], # "urls": [ # { # "url": "windows://", # "tags": [], # }, # { # "url": "mailto://user:pass@gmail.com" # "tags": ["tag1", "tag2", "tag3"] # } # ] # } response = { 'tags': set(), 'urls': [], } # Privacy flag # Support 'yes', '1', 'true', 'enable', 'active', and + privacy = settings.APPRISE_CONFIG_LOCK or \ request.GET.get('privacy', 'no')[0].lower() in ( 'a', 'y', '1', 't', 'e', '+') # Optionally filter on tags. Use comma to identify more then one tag = request.GET.get('tag', 'all') config, format = ConfigCache.get(key) if config is None: # The returned value of config and format tell a rather cryptic # story; this portion could probably be updated in the future. # but for now it reads like this: # config == None and format == None: We had an internal error # config == None and format != None: we simply have no data # config != None: we simply have no data if format is not None: # no content to return return JsonResponse( response, encoder=JSONEncoder, safe=False, status=ResponseCode.no_content, ) # Something went very wrong; return 500 response['error'] = _('There was no configuration found.') return JsonResponse( response, encoder=JSONEncoder, safe=False, status=ResponseCode.internal_server_error, ) # Prepare our apprise object a_obj = apprise.Apprise() # Create an apprise config object ac_obj = apprise.AppriseConfig() # Load our configuration ac_obj.add_config(config, format=format) # Add our configuration a_obj.add(ac_obj) for notification in a_obj.find(tag): # Set Notification response['urls'].append({ 'url': notification.url(privacy=privacy), 'tags': notification.tags, }) # Store Tags response['tags'] |= notification.tags # Return our retrieved content return JsonResponse( response, encoder=JSONEncoder, safe=False, status=ResponseCode.okay )