diff --git a/KEYWORDS b/KEYWORDS index ebd312e3..a4753cdc 100644 --- a/KEYWORDS +++ b/KEYWORDS @@ -29,6 +29,7 @@ KODI Kumulos LaMetric Line +Mastodon MacOS Mailgun Matrix diff --git a/README.md b/README.md index a15972a3..cdbb9a11 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ The table below identifies the services this tool supports and some example serv | [LaMetric Time](https://github.com/caronc/apprise/wiki/Notify_lametric) | lametric:// | (TCP) 443 | lametric://apikey@device_ipaddr
lametric://apikey@hostname:port
lametric://client_id@client_secret | [Line](https://github.com/caronc/apprise/wiki/Notify_line) | line:// | (TCP) 443 | line://Token@User
line://Token/User1/User2/UserN | [Mailgun](https://github.com/caronc/apprise/wiki/Notify_mailgun) | mailgun:// | (TCP) 443 | mailgun://user@hostname/apikey
mailgun://user@hostname/apikey/email
mailgun://user@hostname/apikey/email1/email2/emailN
mailgun://user@hostname/apikey/?name="From%20User" +| [Mastodon](https://github.com/caronc/apprise/wiki/Notify_mastodon) | mastodon:// or mastodons://| (TCP) 80 or 443 | mastodon://access_key@hostname
mastodon://access_key@hostname/@user
mastodon://access_key@hostname/@user1/@user2/@userN | [Matrix](https://github.com/caronc/apprise/wiki/Notify_matrix) | matrix:// or matrixs:// | (TCP) 80 or 443 | matrix://hostname
matrix://user@hostname
matrixs://user:pass@hostname:port/#room_alias
matrixs://user:pass@hostname:port/!room_id
matrixs://user:pass@hostname:port/#room_alias/!room_id/#room2
matrixs://token@hostname:port/?webhook=matrix
matrix://user:token@hostname/?webhook=slack&format=markdown | [Mattermost](https://github.com/caronc/apprise/wiki/Notify_mattermost) | mmost:// or mmosts:// | (TCP) 8065 | mmost://hostname/authkey
mmost://hostname:80/authkey
mmost://user@hostname:80/authkey
mmost://hostname/authkey?channel=channel
mmosts://hostname/authkey
mmosts://user@hostname/authkey
| [Microsoft Teams](https://github.com/caronc/apprise/wiki/Notify_msteams) | msteams:// | (TCP) 443 | msteams://TokenA/TokenB/TokenC/ diff --git a/apprise/AppriseAttachment.py b/apprise/AppriseAttachment.py index b808cfae..833edaa0 100644 --- a/apprise/AppriseAttachment.py +++ b/apprise/AppriseAttachment.py @@ -170,6 +170,11 @@ class AppriseAttachment: return_status = False continue + elif isinstance(_attachment, AppriseAttachment): + # We were provided a list of Apprise Attachments + # append our content together + instance = _attachment.attachments + elif not isinstance(_attachment, attachment.AttachBase): logger.warning( "An invalid attachment (type={}) was specified.".format( @@ -196,7 +201,11 @@ class AppriseAttachment: continue # Add our initialized plugin to our server listings - self.attachments.append(instance) + if isinstance(instance, list): + self.attachments.extend(instance) + + else: + self.attachments.append(instance) # Return our status return return_status diff --git a/apprise/plugins/NotifyMastodon.py b/apprise/plugins/NotifyMastodon.py new file mode 100644 index 00000000..90001c1d --- /dev/null +++ b/apprise/plugins/NotifyMastodon.py @@ -0,0 +1,976 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 Chris Caron +# All rights reserved. +# +# This code is licensed under the MIT License. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files(the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions : +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import re +import requests +from copy import deepcopy +from json import dumps, loads +from datetime import datetime + +from .NotifyBase import NotifyBase +from ..URLBase import PrivacyMode +from ..common import NotifyImageSize +from ..common import NotifyFormat +from ..common import NotifyType +from ..utils import parse_list +from ..utils import parse_bool +from ..utils import validate_regex +from ..AppriseLocale import gettext_lazy as _ +from ..attachment.AttachBase import AttachBase + +# Accept: +# - @username +# - username +# - username@host.com +# - @username@host.com +IS_USER = re.compile( + r'^\s*@?(?P[A-Z0-9_]+(?:@(?P[A-Z0-9_.-]+))?)$', re.I) + +USER_DETECTION_RE = re.compile( + r'(@[A-Z0-9_]+(?:@[A-Z0-9_.-]+)?)(?=$|[\s,.&()\[\]]+)', re.I) + + +class MastodonMessageVisibility: + """ + The visibility of any status message made + """ + # post visibility defaults to the accounts default-visibilty setting + DEFAULT = 'default' + + # post will be visible only to mentioned users + # similar to a Twitter DM + DIRECT = 'direct' + + # post will be visible only to followers + PRIVATE = 'private' + + # post will be public but not appear on the public timeline + UNLISTED = 'unlisted' + + # post will be public + PUBLIC = 'public' + + +# Define the types in a list for validation purposes +MASTODON_MESSAGE_VISIBILITIES = ( + MastodonMessageVisibility.DEFAULT, + MastodonMessageVisibility.DIRECT, + MastodonMessageVisibility.PRIVATE, + MastodonMessageVisibility.UNLISTED, + MastodonMessageVisibility.PUBLIC, +) + + +class NotifyMastodon(NotifyBase): + """ + A wrapper for Notify Mastodon Notifications + """ + + # The default descriptive name associated with the Notification + service_name = 'Mastodon' + + # The services URL + service_url = 'https://joinmastodon.org' + + # The default protocol + protocol = ('mastodon', 'toot') + + # The default secure protocol + secure_protocol = ('mastodons', 'toots') + + # A URL that takes you to the setup/help of the specific protocol + setup_url = 'https://github.com/caronc/apprise/wiki/Notify_mastodon' + + # Allows the user to specify the NotifyImageSize object; this is supported + # through the webhook + image_size = NotifyImageSize.XY_128 + + # it is documented on the site that the maximum images per toot + # is 4 (unless it's a GIF, then it's only 1) + __toot_non_gif_images_batch = 4 + + # Mastodon API Reference To Acquire Current Users Information + # See: https://docs.joinmastodon.org/methods/accounts/ + # Requires Scope Element: read:accounts + mastodon_whoami = '/api/v1/accounts/verify_credentials' + + # URL for posting media files + mastodon_media = '/api/v1/media' + + # URL for posting status messages + mastodon_toot = '/api/v1/statuses' + + # URL for posting direct messages + mastodon_dm = '/api/v1/dm' + + # The title is not used + title_maxlen = 0 + + # The maximum size of the message + body_maxlen = 500 + + # Default to text + notify_format = NotifyFormat.TEXT + + # Mastodon is kind enough to return how many more requests we're allowed to + # continue to make within it's header response as: + # X-Rate-Limit-Reset: The epoc time (in seconds) we can expect our + # rate-limit to be reset. + # X-Rate-Limit-Remaining: an integer identifying how many requests we're + # still allow to make. + request_rate_per_sec = 0 + + # For Tracking Purposes + ratelimit_reset = datetime.utcnow() + + # Default to 1000; users can send up to 1000 DM's and 2400 toot a day + # This value only get's adjusted if the server sets it that way + ratelimit_remaining = 1 + + # Define object templates + templates = ( + '{schema}://{token}@{host}', + '{schema}://{token}@{host}:{port}', + '{schema}://{token}@{host}/{targets}', + '{schema}://{token}@{host}:{port}/{targets}', + ) + + # Define our template arguments + template_tokens = dict(NotifyBase.template_tokens, **{ + 'host': { + 'name': _('Hostname'), + 'type': 'string', + 'required': True, + }, + 'token': { + 'name': _('Access Token'), + 'type': 'string', + 'required': True, + }, + 'port': { + 'name': _('Port'), + 'type': 'int', + 'min': 1, + 'max': 65535, + }, + 'target_user': { + 'name': _('Target User'), + 'type': 'string', + 'prefix': '@', + 'map_to': 'targets', + }, + 'targets': { + 'name': _('Targets'), + 'type': 'list:string', + }, + }) + + # Define our template arguments + template_args = dict(NotifyBase.template_args, **{ + 'token': { + 'alias_of': 'token', + }, + 'visibility': { + 'name': _('Visibility'), + 'type': 'choice:string', + 'values': MASTODON_MESSAGE_VISIBILITIES, + 'default': MastodonMessageVisibility.DEFAULT, + }, + 'cache': { + 'name': _('Cache Results'), + 'type': 'bool', + 'default': True, + }, + 'batch': { + 'name': _('Batch Mode'), + 'type': 'bool', + 'default': True, + }, + 'sensitive': { + 'name': _('Sensitive Attachments'), + 'type': 'bool', + 'default': False, + }, + 'spoiler': { + 'name': _('Spoiler Text'), + 'type': 'string', + }, + 'key': { + 'name': _('Idempotency-Key'), + 'type': 'string', + }, + 'language': { + 'name': _('Language Code'), + 'type': 'string', + }, + 'to': { + 'alias_of': 'targets', + }, + }) + + def __init__(self, token=None, targets=None, batch=True, + sensitive=None, spoiler=None, visibility=None, cache=True, + key=None, language=None, **kwargs): + """ + Initialize Notify Mastodon Object + """ + super().__init__(**kwargs) + + # Set our schema + self.schema = 'https' if self.secure else 'http' + + # Initialize our cache value + self._whoami_cache = None + + self.token = validate_regex(token) + if not self.token: + msg = 'An invalid Twitter Consumer Key was specified.' + self.logger.warning(msg) + raise TypeError(msg) + + if visibility: + # Input is a string; attempt to get the lookup from our + # sound mapping + vis = 'invalid' if not isinstance(visibility, str) \ + else visibility.lower().strip() + + # This little bit of black magic allows us to match against + # against multiple versions of the same string + # ... etc + self.visibility = \ + next((v for v in MASTODON_MESSAGE_VISIBILITIES + if v.startswith(vis)), None) + + if self.visibility not in MASTODON_MESSAGE_VISIBILITIES: + msg = 'The Mastodon visibility specified ({}) is invalid.' \ + .format(visibility) + self.logger.warning(msg) + raise TypeError(msg) + + else: + self.visibility = \ + self.template_args['visibility']['default'] + + # Prepare our URL + self.api_url = '%s://%s' % (self.schema, self.host) + + if isinstance(self.port, int): + self.api_url += ':%d' % self.port + + # Set Cache Flag + self.cache = cache + + # Prepare Image Batch Mode Flag + self.batch = self.template_args['batch']['default'] \ + if batch is None else batch + + # Images to be marked sensitive + self.sensitive = self.template_args['sensitive']['default'] \ + if sensitive is None else sensitive + + # Text marked as being a spoiler + self.spoiler = spoiler if isinstance(spoiler, str) else None + + # Idempotency Key + self.idempotency_key = key if isinstance(key, str) else None + + # Over-ride default language (ISO 639) (e.g: en, fr, es, etc) + self.language = language if isinstance(language, str) else None + + # Our target users + self.targets = [] + + # Track any errors + has_error = False + + # Identify our targets + for target in parse_list(targets): + match = IS_USER.match(target) + if match and match.group('user'): + self.targets.append('@' + match.group('user')) + continue + + has_error = True + self.logger.warning( + 'Dropped invalid Mastodon user ({}) specified.'.format(target), + ) + + if has_error and not self.targets: + # We have specified that we want to notify one or more individual + # and we failed to load any of them. Since it's also valid to + # notify no one at all (which means we notify ourselves), it's + # important we don't switch from the users original intentions + msg = 'No Mastodon targets to notify.' + self.logger.warning(msg) + raise TypeError(msg) + + return + + def url(self, privacy=False, *args, **kwargs): + """ + Returns the URL built dynamically based on specified arguments. + """ + + # Define any URL parameters + params = { + 'visibility': self.visibility, + 'batch': 'yes' if self.batch else 'no', + 'sensitive': 'yes' if self.sensitive else 'no', + } + + # Extend our parameters + params.update(self.url_parameters(privacy=privacy, *args, **kwargs)) + + if self.spoiler: + # Our Spoiler if one was specified + params['spoiler'] = self.spoiler + + if self.idempotency_key: + # Our Idempotency Key + params['key'] = self.idempotency_key + + if self.language: + # Override Language + params['language'] = self.language + + default_port = 443 if self.secure else 80 + + return '{schema}://{token}@{host}{port}/{targets}?{params}'.format( + schema=self.secure_protocol[0] + if self.secure else self.protocol[0], + token=self.pprint( + self.token, privacy, mode=PrivacyMode.Secret, safe=''), + # never encode hostname since we're expecting it to be a valid one + host=self.host, + port='' if self.port is None or self.port == default_port + else ':{}'.format(self.port), + targets='/'.join( + [NotifyMastodon.quote(x, safe='@') for x in self.targets]), + params=NotifyMastodon.urlencode(params), + ) + + def send(self, body, title='', notify_type=NotifyType.INFO, attach=None, + **kwargs): + """ + wrapper to _send since we can alert more then one channel + """ + + # Build a list of our attachments + attachments = [] + + # Smart Target Detection for Direct Messages; this prevents us from + # adding @user entries that were already placed in the message body + users = set(USER_DETECTION_RE.findall(body)) + targets = users - set(self.targets.copy()) + if not self.targets and self.visibility == \ + MastodonMessageVisibility.DIRECT: + + result = self._whoami() + if not result: + # Could not access our status + return False + + myself = '@' + next(iter(result.keys())) + if myself in users: + targets.remove(myself) + + else: + targets.add(myself) + + if attach: + # We need to upload our payload first so that we can source it + # in remaining messages + for attachment in attach: + + # Perform some simple error checking + if not attachment: + # We could not access the attachment + self.logger.error( + 'Could not access attachment {}.'.format( + attachment.url(privacy=True))) + return False + + # + # Images (PNG, JPG, GIF) up to 8MB. + # - Images will be downscaled to 1.6 megapixels (enough for a + # 1280x1280 image). + # - Up to 4 images can be attached. + # - Animated GIFs are converted to soundless MP4s like on + # Imgur/Gfycat (GIFV). + # - You can also upload soundless MP4 and WebM, which will + # be handled the same way. + # Videos (MP4, M4V, MOV, WebM) up to 40MB. + # - Video will be transcoded to H.264 MP4 with a maximum + # bitrate of 1300kbps and framerate of 60fps. + # Audio (MP3, OGG, WAV, FLAC, OPUS, AAC, M4A, 3GP) up to 40MB. + # - Audio will be transcoded to MP3 using V2 VBR (roughly + # 192kbps). + if not re.match(r'^(image|video|audio)/.*', + attachment.mimetype, re.I): + # Only support images at this time + self.logger.warning( + 'Ignoring unsupported Mastodon attachment {}.'.format( + attachment.url(privacy=True))) + continue + + self.logger.debug( + 'Preparing Mastodon attachment {}'.format( + attachment.url(privacy=True))) + + # Upload our image and get our id associated with it + postokay, response = self._request( + self.mastodon_media, + payload=attachment, + ) + + if not postokay: + # We can't post our attachment + if response and 'authorized scopes' \ + in response.get('error', ''): + self.logger.warning( + 'Failed to Send Attachment to Mastodon: ' + 'missing scope: write:media') + + # All other failures should cause us to abort + return False + + if not (isinstance(response, dict) + and response.get('id')): + self.logger.debug( + 'Could not attach the file to Mastodon: %s (mime=%s)', + attachment.name, attachment.mimetype) + continue + + # If we get here, our output will look something like this: + # { + # 'id': '12345', + # 'type': 'image', + # 'url': 'https://.../6dad4663a.jpeg', + # 'preview_url': 'https://.../adde6dad4663a.jpeg', + # 'remote_url': None, + # 'preview_remote_url': None, + # 'text_url': None, + # 'meta': { + # 'original': { + # 'width': 640, + # 'height': 640, + # 'size': '640x640', + # 'aspect': 1.0 + # }, + # 'small': { + # 'width': 400, + # 'height': 400, + # 'size': '400x400', + # 'aspect': 1.0 + # } + # }, + # 'description': None, + # 'blurhash': 'UmIsdJnT^mX4V@XQofnQ~Ebq%4o3ofnQjZbt' + # } + response.update({ + # Update our response to additionally include the + # attachment details + 'file_name': attachment.name, + 'file_mime': attachment.mimetype, + 'file_path': attachment.path, + }) + + # Save our pre-prepared payload for attachment posting + attachments.append(response) + + payload = { + 'status': '{} {}'.format(' '.join(targets), body) + if targets else body, + 'sensitive': self.sensitive, + } + + # Handle Visibility Flag + if self.visibility != MastodonMessageVisibility.DEFAULT: + payload['visibility'] = self.visibility + + # Set Spoiler text (if set) + if self.spoiler: + payload['spoiler_text'] = self.spoiler + + # Set Idempotency-Key (if set) + if self.idempotency_key: + payload['Idempotency-Key'] = self.idempotency_key + + # Set Language + if self.language: + payload['language'] = self.language + + payloads = [] + if not attachments: + payloads.append(payload) + + else: + # Group our images if batch is set to do so + batch_size = 1 if not self.batch \ + else self.__toot_non_gif_images_batch + + # Track our batch control in our message generation + batches = [] + batch = [] + for attachment in attachments: + batch.append(attachment['id']) + # Mastodon supports batching images together. This allows + # the batching of multiple images together. Mastodon also + # makes it clear that you can't batch `gif` files; they need + # to be separate. So the below preserves the ordering that + # a user passed their attachments in. if 4-non-gif images + # are passed, they are all part of a single message. + # + # however, if they pass in image, gif, image, gif. The + # gif's inbetween break apart the batches so this would + # produce 4 separate toots. + # + # If you passed in, image, image, gif, image. <- This would + # produce 3 images (as the first 2 images could be lumped + # together as a batch) + if not re.match( + r'^image/(png|jpe?g)', attachment['file_mime'], re.I) \ + or len(batch) >= batch_size: + batches.append(batch) + batch = [] + + if batch: + batches.append(batch) + + for no, media_ids in enumerate(batches): + _payload = deepcopy(payload) + _payload['media_ids'] = media_ids + + if no: + # strip text and replace it with the image representation + _payload['status'] = \ + '{:02d}/{:02d}'.format(no + 1, len(batches)) + # No longer sensitive information + _payload['sensitive'] = False + if self.idempotency_key: + # Support multiposts while a Idempotency Key has been + # defined + _payload['Idempotency-Key'] = '{}-part{:02d}'.format( + self.idempotency_key, no) + payloads.append(_payload) + + # Error Tracking + has_error = False + + for no, payload in enumerate(payloads, start=1): + postokay, response = self._request(self.mastodon_toot, payload) + if not postokay: + # Track our error + has_error = True + + # We can't post our attachment + if response and 'authorized scopes' \ + in response.get('error', ''): + self.logger.warning( + 'Failed to Send Status to Mastodon: ' + 'missing scope: write:statuses') + + continue + + # Example Attachment Output: + # { + # "id":"109315796435904505", + # "created_at":"2022-11-09T20:44:39.017Z", + # "in_reply_to_id":null, + # "in_reply_to_account_id":null, + # "sensitive":false, + # "spoiler_text":"", + # "visibility":"public", + # "language":"en", + # "uri":"https://host/users/caronc/statuses/109315796435904505", + # "url":"https://host/@caronc/109315796435904505", + # "replies_count":0, + # "reblogs_count":0, + # "favourites_count":0, + # "edited_at":null, + # "favourited":false, + # "reblogged":false, + # "muted":false, + # "bookmarked":false, + # "pinned":false, + # "content":"

test

", + # "reblog":null, + # "application":{ + # "name":"Apprise Notifications", + # "website":"https://github.com/caronc/apprise" + # }, + # "account":{ + # "id":"109310334138718878", + # "username":"caronc", + # "acct":"caronc", + # "display_name":"Chris", + # "locked":false, + # "bot":false, + # "discoverable":false, + # "group":false, + # "created_at":"2022-11-08T00:00:00.000Z", + # "note":"content", + # "url":"https://host/@caronc", + # "avatar":"https://host/path/file.png", + # "avatar_static":"https://host/path/file.png", + # "header":"https://host/headers/original/missing.png", + # "header_static":"https://host/path/missing.png", + # "followers_count":0, + # "following_count":0, + # "statuses_count":15, + # "last_status_at":"2022-11-09", + # "emojis":[ + # + # ], + # "fields":[ + # + # ] + # }, + # "media_attachments":[ + # { + # "id":"109315796405707501", + # "type":"image", + # "url":"https://host/path/file.jpeg", + # "preview_url":"https://host/path/file.jpeg", + # "remote_url":null, + # "preview_remote_url":null, + # "text_url":null, + # "meta":{ + # "original":{ + # "width":640, + # "height":640, + # "size":"640x640", + # "aspect":1.0 + # }, + # "small":{ + # "width":400, + # "height":400, + # "size":"400x400", + # "aspect":1.0 + # } + # }, + # "description":null, + # "blurhash":"UmIsdJnT^mX4V@XQofnQ~Ebq%4o3ofnQjZbt" + # } + # ], + # "mentions":[ + # + # ], + # "tags":[ + # + # ], + # "emojis":[ + # + # ], + # "card":null, + # "poll":null + # } + + try: + url = '{}/web/@{}'.format( + self.api_url, + response['account']['username']) + + except (KeyError, TypeError): + url = 'unknown' + + self.logger.debug( + 'Mastodon [%.2d/%.2d] (%d attached) delivered to %s', + no, len(payloads), len(payload.get('media_ids', [])), url) + + self.logger.info( + 'Sent [%.2d/%.2d] Mastodon notification as public toot.', + no, len(payloads)) + + return not has_error + + def _whoami(self, lazy=True): + """ + Looks details of current authenticated user + + """ + + if lazy and self._whoami_cache is not None: + # Use cached response + return self._whoami_cache + + # Send Mastodon Whoami request + postokay, response = self._request( + self.mastodon_whoami, + method='GET', + ) + + if postokay: + # Sample Response: + # { + # 'id': '12345', + # 'username': 'caronc', + # 'acct': 'caronc', + # 'display_name': 'Chris', + # 'locked': False, + # 'bot': False, + # 'discoverable': False, + # 'group': False, + # 'created_at': '2022-11-08T00:00:00.000Z', + # 'note': 'details', + # 'url': 'https://noc.social/@caronc', + # 'avatar': 'https://host/path/image.png', + # 'avatar_static': 'https://host/path/image.png', + # 'header': 'https://host/path/missing.png', + # 'header_static': 'https://host/path/missing.png', + # 'followers_count': 0, + # 'following_count': 0, + # 'statuses_count': 2, + # 'last_status_at': '2022-11-09', + # 'source': { + # 'privacy': 'public', + # 'sensitive': False, + # 'language': None, + # 'note': 'details', + # 'fields': [], + # 'follow_requests_count': 0 + # }, + # 'emojis': [], + # 'fields': [] + # } + try: + # Cache our response for future references + self._whoami_cache = { + response['username']: response['id']} + + except (TypeError, KeyError): + pass + + elif response and 'authorized scopes' in response.get('error', ''): + self.logger.warning( + 'Failed to lookup Mastodon Auth details; ' + 'missing scope: read:accounts') + + return self._whoami_cache if postokay else {} + + def _request(self, path, payload=None, method='POST'): + """ + Wrapper to Mastodon API requests object + """ + + headers = { + 'User-Agent': self.app_id, + 'Authorization': f'Bearer {self.token}', + } + + data = None + files = None + + # Prepare our message + url = '{}{}'.format(self.api_url, path) + + # Some Debug Logging + self.logger.debug('Mastodon {} URL: {} (cert_verify={})'.format( + method, url, self.verify_certificate)) + + # Open our attachment path if required: + if isinstance(payload, AttachBase): + # prepare payload + files = { + 'file': (payload.name, open(payload.path, 'rb'), + 'application/octet-stream')} + + # Provide a description + data = { + 'description': payload.name, + } + + else: + headers['Content-Type'] = 'application/json' + data = dumps(payload) + self.logger.debug('Mastodon Payload: %s' % str(payload)) + + # Default content response object + content = {} + + # By default set wait to None + wait = None + + if self.ratelimit_remaining == 0: + # Determine how long we should wait for or if we should wait at + # all. This isn't fool-proof because we can't be sure the client + # time (calling this script) is completely synced up with the + # Mastodon server. One would hope we're on NTP and our clocks are + # the same allowing this to role smoothly: + + now = datetime.utcnow() + if now < self.ratelimit_reset: + # We need to throttle for the difference in seconds + # We add 0.5 seconds to the end just to allow a grace + # period. + wait = (self.ratelimit_reset - now).total_seconds() + 0.5 + + # Always call throttle before any remote server i/o is made; + self.throttle(wait=wait) + + # acquire our request mode + fn = requests.post if method == 'POST' else requests.get + + try: + r = fn( + url, + data=data, + files=files, + headers=headers, + verify=self.verify_certificate, + timeout=self.request_timeout, + ) + + try: + content = loads(r.content) + + except (AttributeError, TypeError, ValueError): + # ValueError = r.content is Unparsable + # TypeError = r.content is None + # AttributeError = r is None + content = {} + + if r.status_code not in ( + requests.codes.ok, requests.codes.created, + requests.codes.accepted): + + # We had a problem + status_str = \ + NotifyMastodon.http_response_code_lookup(r.status_code) + + self.logger.warning( + 'Failed to send Mastodon {} to {}: ' + '{}error={}.'.format( + method, + url, + ', ' if status_str else '', + r.status_code)) + + self.logger.debug( + 'Response Details:\r\n{}'.format(r.content)) + + # Mark our failure + return (False, content) + + try: + # Capture rate limiting if possible + self.ratelimit_remaining = \ + int(r.headers.get('X-RateLimit-Remaining')) + self.ratelimit_reset = datetime.utcfromtimestamp( + int(r.headers.get('X-RateLimit-Limit'))) + + except (TypeError, ValueError): + # This is returned if we could not retrieve this information + # gracefully accept this state and move on + pass + + except requests.RequestException as e: + self.logger.warning( + 'Exception received when sending Mastodon {} to {}: '. + format(method, url)) + self.logger.debug('Socket Exception: %s' % str(e)) + + # Mark our failure + return (False, content) + + except (OSError, IOError) as e: + self.logger.warning( + 'An I/O error occurred while handling {}.'.format( + payload.name if isinstance(payload, AttachBase) + else payload)) + self.logger.debug('I/O Exception: %s' % str(e)) + return (False, content) + + finally: + # Close our file (if it's open) stored in the second element + # of our files tuple (index 1) + if files: + files['file'][1].close() + + return (True, content) + + @staticmethod + def parse_url(url): + """ + Parses the URL and returns enough arguments that can allow + us to re-instantiate this object. + + """ + results = NotifyBase.parse_url(url) + if not results: + # We're done early as we couldn't load the results + return results + + if 'token' in results['qsd'] and len(results['qsd']['token']): + results['token'] = NotifyMastodon.unquote(results['qsd']['token']) + + elif not results['password'] and results['user']: + results['token'] = NotifyMastodon.unquote(results['user']) + + # Apply our targets + results['targets'] = NotifyMastodon.split_path(results['fullpath']) + + # The defined Mastodon visibility + if 'visibility' in results['qsd'] and \ + len(results['qsd']['visibility']): + # Simplified version + results['visibility'] = \ + NotifyMastodon.unquote(results['qsd']['visibility']) + + elif results['schema'].startswith('toot'): + results['visibility'] = MastodonMessageVisibility.PUBLIC + + # Get Idempotency Key (if specified) + if 'key' in results['qsd'] and len(results['qsd']['key']): + results['key'] = \ + NotifyMastodon.unquote(results['qsd']['key']) + + # Get Spoiler Text + if 'spoiler' in results['qsd'] and len(results['qsd']['spoiler']): + results['spoiler'] = \ + NotifyMastodon.unquote(results['qsd']['spoiler']) + + # Get Language (if specified) + if 'language' in results['qsd'] and len(results['qsd']['language']): + results['language'] = \ + NotifyMastodon.unquote(results['qsd']['language']) + + # Get Sensitive Flag (for Attachments) + results['sensitive'] = \ + parse_bool(results['qsd'].get( + 'sensitive', + NotifyMastodon.template_args['sensitive']['default'])) + + # Get Batch Mode Flag + results['batch'] = \ + parse_bool(results['qsd'].get( + 'batch', NotifyMastodon.template_args['batch']['default'])) + + # The 'to' makes it easier to use yaml configuration + if 'to' in results['qsd'] and len(results['qsd']['to']): + results['targets'] += \ + NotifyMastodon.parse_list(results['qsd']['to']) + + return results diff --git a/packaging/redhat/python-apprise.spec b/packaging/redhat/python-apprise.spec index 1b3e511c..1eae7512 100644 --- a/packaging/redhat/python-apprise.spec +++ b/packaging/redhat/python-apprise.spec @@ -39,10 +39,10 @@ Apprise API, AWS SES, AWS SNS, Bark, BulkSMS, Boxcar, ClickSend, DAPNET, DingTalk, Discord, E-Mail, Emby, Faast, FCM, Flock, Gitter, Google Chat, Gotify, Growl, Guilded, Home Assistant, IFTTT, Join, Kavenegar, KODI, Kumulos, LaMetric, Line, MacOSX, Mailgun, Mattermost, Matrix, Microsoft Windows, -Microsoft Teams, MessageBird, MQTT, MSG91, MyAndroid, Nexmo, Nextcloud, -NextcloudTalk, Notica, Notifico, ntfy, Office365, OneSignal, Opsgenie, -PagerDuty, ParsePlatform, PopcornNotify, Prowl, Pushalot, PushBullet, Pushjet, -Pushover, PushSafer, Reddit, Rocket.Chat, SendGrid, ServerChan, Signal, +Mastodon, Microsoft Teams, MessageBird, MQTT, MSG91, MyAndroid, Nexmo, +Nextcloud, NextcloudTalk, Notica, Notifico, ntfy, Office365, OneSignal, +Opsgenie, PagerDuty, ParsePlatform, PopcornNotify, Prowl, Pushalot, PushBullet, +Pushjet, Pushover, PushSafer, Reddit, Rocket.Chat, SendGrid, ServerChan, Signal, SimplePush, Sinch, Slack, SMSEagle, SMTP2Go, Spontit, SparkPost, Super Toasty, Streamlabs, Stride, Syslog, Techulus Push, Telegram, Twilio, Twitter, Twist, XBMC, Vonage, Webex Teams} diff --git a/test/test_plugin_mastodon.py b/test/test_plugin_mastodon.py new file mode 100644 index 00000000..8e386d21 --- /dev/null +++ b/test/test_plugin_mastodon.py @@ -0,0 +1,747 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 Chris Caron +# All rights reserved. +# +# This code is licensed under the MIT License. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files(the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions : +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import os +from unittest import mock + +import requests +from json import dumps, loads +from datetime import datetime +from apprise import Apprise +from apprise import NotifyType +from apprise import AppriseAttachment +from apprise.plugins.NotifyMastodon import NotifyMastodon +from helpers import AppriseURLTester + +# Disable logging for a cleaner testing output +import logging +logging.disable(logging.CRITICAL) + +# Attachment Directory +TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var') + +# Our Testing URLs +apprise_url_tests = ( + ################################## + # NotifyMastodon + ################################## + ('mastodon://', { + # Missing Everything :) + 'instance': None, + }), + ('mastodon://:@/', { + 'instance': None, + }), + ('mastodon://hostname', { + # Missing Access Token + 'instance': TypeError, + }), + ('toot://access_token@hostname', { + # We're good; it's a simple notification + 'instance': NotifyMastodon, + }), + ('toots://access_token@hostname', { + # We're good; it's another simple notification + 'instance': NotifyMastodon, + # Our expected url(privacy=True) startswith() response: + 'privacy_url': 'mastodons://****@hostname/' + }), + ('mastodon://access_token@hostname/@user/@user2', { + # We're good; it's another simple notification + 'instance': NotifyMastodon, + # Our expected url(privacy=True) startswith() response: + 'privacy_url': 'mastodon://****@hostname/@user/@user2' + }), + ('mastodon://hostname/@user/@user2?token=abcd123', { + # Our access token can be provided as a token= variable + 'instance': NotifyMastodon, + # Our expected url(privacy=True) startswith() response: + 'privacy_url': 'mastodon://****@hostname/@user/@user2' + }), + ('mastodon://access_token@hostname?to=@user, @user2', { + # We're good; it's another simple notification + 'instance': NotifyMastodon, + # Our expected url(privacy=True) startswith() response: + 'privacy_url': 'mastodon://****@hostname/@user/@user2' + }), + ('mastodon://access_token@hostname/?cache=no', { + # disable cache as a test + 'instance': NotifyMastodon, + }), + ('mastodon://access_token@hostname/?spoiler=spoiler%20text', { + # a public post + 'instance': NotifyMastodon, + }), + ('mastodon://access_token@hostname/?language=en', { + # over-ride our language + 'instance': NotifyMastodon, + }), + ('mastodons://access_token@hostname:8443', { + # A custom port specified + 'instance': NotifyMastodon, + }), + ('mastodon://access_token@hostname/?key=My%20Idempotency%20Key', { + # Prevent duplicate submissions of the same status. Idempotency + # keys are stored for up to 1 hour, and can be any arbitrary string. + # Consider using a hash or UUID generated client-side. + 'instance': NotifyMastodon, + }), + ('mastodon://access_token@hostname/-/%/', { + # Invalid users specified + 'instance': TypeError, + }), + ('mastodon://access_token@hostname?visibility=invalid', { + # An invalid visibility + 'instance': TypeError, + }), + ('mastodon://access_token@hostname?visibility=direct', { + # A direct message + 'instance': NotifyMastodon, + # Expected notify() response False (because we won't + # get the response we were expecting from the upstream + # server + 'notify_response': False, + }), + ('mastodon://access_token@hostname?visibility=direct', { + # A direct message + 'instance': NotifyMastodon, + # Provide a response that allows us to look our content up + 'requests_response_text': { + 'id': '12345', + 'username': 'test', + }, + }), + ('toots://access_token@hostname', { + 'instance': NotifyMastodon, + # Throws a series of connection and transfer exceptions when this flag + # is set and tests that we gracfully handle them + 'test_requests_exceptions': True, + }), + ('mastodons://access_token@hostname', { + 'instance': NotifyMastodon, + # Throws a series of connection and transfer exceptions when this flag + # is set and tests that we gracfully handle them + 'test_requests_exceptions': True, + }), +) + + +def test_plugin_mastodon_urls(): + """ + NotifyMastodon() Apprise URLs + + """ + + # Run our general tests + AppriseURLTester(tests=apprise_url_tests).run_all() + + +@mock.patch('requests.get') +@mock.patch('requests.post') +def test_plugin_mastodon_general(mock_post, mock_get): + """ + NotifyMastodon() General Tests + + """ + token = 'access_key' + host = 'nuxref.com' + + response_obj = { + 'username': 'caronc', + 'id': 1234, + } + + # Epoch time: + epoch = datetime.utcfromtimestamp(0) + + request = mock.Mock() + request.content = dumps(response_obj) + request.status_code = requests.codes.ok + request.headers = { + 'X-RateLimit-Limit': (datetime.utcnow() - epoch).total_seconds(), + 'X-RateLimit-Remaining': 1, + } + + # Prepare Mock + mock_get.return_value = request + mock_post.return_value = request + + # Instantiate our object + obj = NotifyMastodon(token=token, host=host) + + assert isinstance(obj, NotifyMastodon) is True + assert isinstance(obj.url(), str) is True + + # apprise room was found + assert obj.send(body="test") is True + + # Change our status code and try again + request.status_code = 403 + assert obj.send(body="test") is False + assert obj.ratelimit_remaining == 1 + + # Return the status + request.status_code = requests.codes.ok + # Force a reset + request.headers['X-RateLimit-Remaining'] = 0 + # behind the scenes, it should cause us to update our rate limit + assert obj.send(body="test") is True + assert obj.ratelimit_remaining == 0 + + # This should cause us to block + request.headers['X-RateLimit-Remaining'] = 10 + assert obj.send(body="test") is True + assert obj.ratelimit_remaining == 10 + + # Handle cases where we simply couldn't get this field + del request.headers['X-RateLimit-Remaining'] + assert obj.send(body="test") is True + # It remains set to the last value + assert obj.ratelimit_remaining == 10 + + # Reset our variable back to 1 + request.headers['X-RateLimit-Remaining'] = 1 + + # Handle cases where our epoch time is wrong + del request.headers['X-RateLimit-Limit'] + assert obj.send(body="test") is True + + # Return our object, but place it in the future forcing us to block + request.headers['X-RateLimit-Limit'] = \ + (datetime.utcnow() - epoch).total_seconds() + 1 + request.headers['X-RateLimit-Remaining'] = 0 + obj.ratelimit_remaining = 0 + assert obj.send(body="test") is True + + # Return our object, but place it in the future forcing us to block + request.headers['X-RateLimit-Limit'] = \ + (datetime.utcnow() - epoch).total_seconds() - 1 + request.headers['X-RateLimit-Remaining'] = 0 + obj.ratelimit_remaining = 0 + assert obj.send(body="test") is True + + # Return our limits to always work + request.headers['X-RateLimit-Limit'] = \ + (datetime.utcnow() - epoch).total_seconds() + request.headers['X-RateLimit-Remaining'] = 1 + obj.ratelimit_remaining = 1 + + # Alter pending targets + obj.targets.append('usera') + request.content = dumps(response_obj) + response_obj = { + 'username': 'usera', + 'id': 4321, + } + + # Cause content response to be None + request.content = None + assert obj.send(body="test") is True + + # Invalid JSON + request.content = '{' + assert obj.send(body="test") is True + + # Return it to a parseable string + request.content = '{}' + + results = NotifyMastodon.parse_url( + 'mastodon://{}@{}/@user?visbility=direct'.format(token, host)) + assert isinstance(results, dict) is True + assert '@user' in results['targets'] + + # cause a json parsing issue now + response_obj = None + assert obj.send(body="test") is True + + response_obj = '{' + assert obj.send(body="test") is True + + mock_get.reset_mock() + mock_post.reset_mock() + + # + # Test our lazy lookups + # + + # Prepare a good status response + request = mock.Mock() + request.content = dumps({'id': '1234', 'username': 'caronc'}) + request.status_code = requests.codes.ok + mock_get.return_value = request + + mastodon_url = 'mastodons://key@host?visibility=direct' + obj = Apprise.instantiate(mastodon_url) + obj._whoami(lazy=True) + assert mock_get.call_count == 1 + assert mock_get.call_args_list[0][0][0] == \ + 'https://host/api/v1/accounts/verify_credentials' + + mock_get.reset_mock() + obj._whoami(lazy=True) + assert mock_get.call_count == 0 + + mock_get.reset_mock() + obj._whoami(lazy=False) + assert mock_get.call_count == 1 + assert mock_get.call_args_list[0][0][0] == \ + 'https://host/api/v1/accounts/verify_credentials' + + +@mock.patch('requests.post') +@mock.patch('requests.get') +def test_plugin_mastodon_attachments(mock_get, mock_post): + """ + NotifyMastodon() Toot Attachment Checks + + """ + akey = 'access_key' + host = 'nuxref.com' + username = 'caronc' + + # Prepare a good status response + good_response_obj = { + 'id': '1234', + } + + good_response = mock.Mock() + good_response.content = dumps(good_response_obj) + good_response.status_code = requests.codes.ok + + # Prepare a good whoami response + good_whoami_response_obj = { + 'username': username, + 'id': '9876', + } + + good_whoami_response = mock.Mock() + good_whoami_response.content = dumps(good_whoami_response_obj) + good_whoami_response.status_code = requests.codes.ok + + # Prepare bad response + bad_response = mock.Mock() + bad_response.content = dumps({}) + bad_response.status_code = requests.codes.internal_server_error + + # Prepare a good media response + good_media_response = mock.Mock() + good_media_response.content = dumps({ + "id": '710511363345354753', + "file_mime": "image/jpeg", + }) + good_media_response.status_code = requests.codes.ok + + # + # Start testing using fixtures above + # + mock_post.side_effect = [good_media_response, good_response] + mock_get.return_value = good_whoami_response + + mastodon_url = 'mastodon://{}@{}'.format(akey, host) + + # attach our content + attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif')) + + # instantiate our object + obj = Apprise.instantiate(mastodon_url) + + # Send our notification + assert obj.notify( + body='body', title='title', notify_type=NotifyType.INFO, + attach=attach) is True + + # Test our call count + assert mock_get.call_count == 0 + assert mock_post.call_count == 2 + assert mock_post.call_args_list[0][0][0] == \ + 'http://nuxref.com/api/v1/media' + assert mock_post.call_args_list[1][0][0] == \ + 'http://nuxref.com/api/v1/statuses' + + # Test our media payload + assert 'files' in mock_post.call_args_list[0][1] + assert 'file' in mock_post.call_args_list[0][1]['files'] + assert mock_post.call_args_list[0][1]['files']['file'][0] \ + == 'apprise-test.gif' + + # Test our status payload + payload = loads(mock_post.call_args_list[1][1]['data']) + assert 'status' in payload + assert payload['status'] == 'title\r\nbody' + assert 'sensitive' in payload + assert payload['sensitive'] is False + assert 'media_ids' in payload + assert isinstance(payload['media_ids'], list) + assert len(payload['media_ids']) == 1 + assert payload['media_ids'][0] == '710511363345354753' + + # Verify we don't set incorrect keys not otherwise specified + assert 'spoiler_text' not in payload + + mock_get.reset_mock() + mock_post.reset_mock() + + # + # Handle the query again, but this time perform a direct message + # requiring us to look up who we are + # + mock_post.side_effect = [good_media_response, good_response] + mock_get.return_value = good_whoami_response + + mastodon_url = 'mastodon://{}@{}?visibility=direct'.format(akey, host) + + # attach our content + attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif')) + + # instantiate our object + obj = Apprise.instantiate(mastodon_url) + + # Send our notification + assert obj.notify( + body='body', title='title', notify_type=NotifyType.INFO, + attach=attach) is True + + # Test our call count + assert mock_get.call_count == 1 + assert mock_post.call_count == 2 + assert mock_get.call_args_list[0][0][0] == \ + 'http://nuxref.com/api/v1/accounts/verify_credentials' + assert mock_post.call_args_list[0][0][0] == \ + 'http://nuxref.com/api/v1/media' + assert mock_post.call_args_list[1][0][0] == \ + 'http://nuxref.com/api/v1/statuses' + + # Test our status payload + payload = loads(mock_post.call_args_list[1][1]['data']) + assert 'status' in payload + # Our ID was added into the payload + assert payload['status'] == '@caronc title\r\nbody' + assert 'sensitive' in payload + assert payload['sensitive'] is False + assert 'media_ids' in payload + assert isinstance(payload['media_ids'], list) + assert len(payload['media_ids']) == 1 + assert payload['media_ids'][0] == '710511363345354753' + + mock_get.reset_mock() + mock_post.reset_mock() + + # Store 3 attachments + attach = ( + AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif')), + AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.png')), + AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.jpeg')), + ) + + # Prepare a good media response + mr1 = mock.Mock() + mr1.content = dumps({ + "id": '1', + "file_mime": "image/gif", + }) + mr1.status_code = requests.codes.ok + + mr2 = mock.Mock() + mr2.content = dumps({ + "id": '2', + "file_mime": "image/png", + }) + mr2.status_code = requests.codes.ok + + mr3 = mock.Mock() + mr3.content = dumps({ + "id": '3', + "file_mime": "image/jpeg", + }) + mr3.status_code = requests.codes.ok + + # Return 3 good uploads and a good status response + mock_post.side_effect = [mr1, mr2, mr3, good_response, good_response] + mock_get.return_value = good_whoami_response + + # instantiate our object + mastodon_url = 'mastodons://{}@{}' \ + '?visibility=direct&sensitive=yes&key=abcd'.format(akey, host) + obj = Apprise.instantiate(mastodon_url) + + # Send ourselves a direct message where our ID was already found + # in the body. This smart detection method will prevent us from + # adding the @caronc to the begining of the same message (since it's a + # direct message) + assert obj.notify( + body='Check this out @caronc', title='Apprise', + notify_type=NotifyType.INFO, + attach=attach) is True + + # Test our call count + assert mock_get.call_count == 1 + assert mock_post.call_count == 5 + assert mock_post.call_args_list[0][0][0] == \ + 'https://nuxref.com/api/v1/media' + assert mock_post.call_args_list[1][0][0] == \ + 'https://nuxref.com/api/v1/media' + assert mock_post.call_args_list[2][0][0] == \ + 'https://nuxref.com/api/v1/media' + # Our status's will batch up and send the last 2 images in one + # and our animated gif in one. + assert mock_post.call_args_list[3][0][0] == \ + 'https://nuxref.com/api/v1/statuses' + assert mock_post.call_args_list[4][0][0] == \ + 'https://nuxref.com/api/v1/statuses' + assert mock_get.call_args_list[0][0][0] == \ + 'https://nuxref.com/api/v1/accounts/verify_credentials' + + # Test our status payload + payload = loads(mock_post.call_args_list[3][1]['data']) + assert 'status' in payload + assert payload['status'] == 'Apprise\r\nCheck this out @caronc' + assert 'sensitive' in payload + assert payload['sensitive'] is True + assert 'language' not in payload + assert 'Idempotency-Key' in payload + assert payload['Idempotency-Key'] == 'abcd' + assert 'media_ids' in payload + assert isinstance(payload['media_ids'], list) + assert len(payload['media_ids']) == 1 + assert payload['media_ids'][0] == '1' + + payload = loads(mock_post.call_args_list[4][1]['data']) + assert 'status' in payload + assert payload['status'] == '02/02' + assert 'sensitive' in payload + assert payload['sensitive'] is False + assert 'language' not in payload + assert 'Idempotency-Key' in payload + assert payload['Idempotency-Key'] == 'abcd-part01' + assert 'media_ids' in payload + assert isinstance(payload['media_ids'], list) + assert len(payload['media_ids']) == 2 + assert '2' in payload['media_ids'] + assert '3' in payload['media_ids'] + + # A second call does not cause us to look up our ID as we already know it + mock_get.reset_mock() + mock_post.reset_mock() + mock_post.side_effect = [mr1, mr2, mr3, good_response, good_response] + mock_get.return_value = good_whoami_response + assert obj.notify( + body='Check this out @caronc', title='Apprise', + notify_type=NotifyType.INFO, + attach=attach) is True + + # Same number of posts + assert mock_post.call_count == 5 + # But no lookup was made + assert mock_get.call_count == 0 + + mock_get.reset_mock() + mock_post.reset_mock() + + # Prepare an attach list + attach = ( + os.path.join(TEST_VAR_DIR, 'apprise-test.png'), + os.path.join(TEST_VAR_DIR, 'apprise-test.jpeg'), + ) + + mock_post.side_effect = [mr2, mr3, good_response, good_response] + mock_get.return_value = good_whoami_response + + # instantiate our object (but turn off the batch mode) + mastodon_url = 'mastodons://{}@{}?batch=no'.format( + akey, host) + obj = Apprise.instantiate(mastodon_url) + + assert obj.notify( + body='Check this out @caronc', title='Apprise', + notify_type=NotifyType.INFO, + attach=attach) is True + + # 2 attachments + 2 different status messages + assert mock_post.call_count == 4 + + # But no lookup was made + assert mock_get.call_count == 0 + + assert mock_post.call_args_list[0][0][0] == \ + 'https://nuxref.com/api/v1/media' + assert mock_post.call_args_list[1][0][0] == \ + 'https://nuxref.com/api/v1/media' + assert mock_post.call_args_list[2][0][0] == \ + 'https://nuxref.com/api/v1/statuses' + assert mock_post.call_args_list[3][0][0] == \ + 'https://nuxref.com/api/v1/statuses' + + mock_get.reset_mock() + mock_post.reset_mock() + + # Prepare a bad media response + bad_response = mock.Mock() + bad_response.status_code = requests.codes.internal_server_error + + bad_responses = ( + dumps({"error": "authorized scopes"}), + '', + ) + + # + # Test our Media failures + # + + # Try several bad responses so we can capture the block of code where + # we try to help the end user to remind them what scope they're missing + for response in bad_responses: + mock_post.side_effect = [good_media_response, bad_response] + bad_response.content = response + + # instantiate our object + mastodon_url = \ + 'mastodons://{}@{}?visibility=public&spoiler=uhoh'.format( + akey, host) + obj = Apprise.instantiate(mastodon_url) + + # Our notification will fail now since our toot will error out + # This is the same test as above, except our error response isn't + # parseable + assert obj.notify( + body='body', title='title', notify_type=NotifyType.INFO, + attach=attach) is False + + # Test our call count + assert mock_get.call_count == 0 + assert mock_post.call_count == 2 + assert mock_post.call_args_list[0][0][0] == \ + 'https://nuxref.com/api/v1/media' + assert mock_post.call_args_list[1][0][0] == \ + 'https://nuxref.com/api/v1/media' + + mock_get.reset_mock() + mock_post.reset_mock() + + # + # Test our Status failures + # + + # Try several bad responses so we can capture the block of code where + # we try to help the end user to remind them what scope they're missing + for response in bad_responses: + mock_post.side_effect = [bad_response] + bad_response.content = response + + # instantiate our object + mastodon_url = 'mastodons://{}@{}'.format(akey, host) + obj = Apprise.instantiate(mastodon_url) + + # Our notification will fail now since our toot will error out + # This is the same test as above, except our error response isn't + # parseable + assert obj.notify( + body='body', title='title', notify_type=NotifyType.INFO) is False + + # Test our call count + assert mock_get.call_count == 0 + assert mock_post.call_count == 1 + assert mock_post.call_args_list[0][0][0] == \ + 'https://nuxref.com/api/v1/statuses' + + mock_get.reset_mock() + mock_post.reset_mock() + + # + # Test our whoami failures + # + + # Try several bad responses so we can capture the block of code where + # we try to help the end user to remind them what scope they're missing + for response in bad_responses: + mock_get.side_effect = [bad_response] + bad_response.content = response + + # instantiate our object + mastodon_url = 'mastodons://{}@{}?visibility=direct'.format(akey, host) + obj = Apprise.instantiate(mastodon_url) + + # Our notification will fail now since our toot will error out + # This is the same test as above, except our error response isn't + # parseable + assert obj.notify( + body='body', title='title', notify_type=NotifyType.INFO) is False + + # Test our call count + assert mock_get.call_count == 1 + assert mock_post.call_count == 0 + assert mock_get.call_args_list[0][0][0] == \ + 'https://nuxref.com/api/v1/accounts/verify_credentials' + + mock_get.reset_mock() + mock_post.reset_mock() + + mock_post.side_effect = [mr1, mr2, mr3, good_response, good_response] + mock_get.return_value = None + + # instantiate our object + mastodon_url = 'mastodons://{}@{}'.format(akey, host) + obj = Apprise.instantiate(mastodon_url) + + # An invalid attachment will cause a failure + path = os.path.join(TEST_VAR_DIR, '/invalid/path/to/an/invalid/file.jpg') + assert obj.notify( + body='body', title='title', notify_type=NotifyType.INFO, + attach=path) is False + + # No get requests are made + assert mock_get.call_count == 0 + + # No post request as attachment is no good anyway + assert mock_post.call_count == 0 + + mock_get.reset_mock() + mock_post.reset_mock() + + # We have an OSError thrown in the middle of our preparation + mock_post.side_effect = [ + good_media_response, OSError(), good_media_response] + mock_get.return_value = good_response + + # 3 images are produced + attach = [ + os.path.join(TEST_VAR_DIR, 'apprise-test.gif'), + # This one is not supported, so it's ignored gracefully + os.path.join(TEST_VAR_DIR, 'apprise-archive.zip'), + # A supported video file + os.path.join(TEST_VAR_DIR, 'apprise-test.mp4'), + ] + + # We'll fail to send this time + assert obj.notify( + body='body', title='title', notify_type=NotifyType.INFO, + attach=attach) is False + + assert mock_get.call_count == 0 + # No get request as cached response is used + assert mock_post.call_count == 2 + assert mock_post.call_args_list[0][0][0] == \ + 'https://nuxref.com/api/v1/media' + assert mock_post.call_args_list[1][0][0] == \ + 'https://nuxref.com/api/v1/media' diff --git a/test/var/apprise-archive.zip b/test/var/apprise-archive.zip new file mode 100644 index 00000000..946b819c Binary files /dev/null and b/test/var/apprise-archive.zip differ