mirror of
https://github.com/caronc/apprise.git
synced 2024-11-21 15:43:27 +01:00
Mastodon Support (#747)
This commit is contained in:
parent
32992fa641
commit
6d3ab9b3fd
@ -79,6 +79,7 @@ The table below identifies the services this tool supports and some example serv
|
||||
| [LaMetric Time](https://github.com/caronc/apprise/wiki/Notify_lametric) | lametric:// | (TCP) 443 | lametric://apikey@device_ipaddr<br/>lametric://apikey@hostname:port<br/>lametric://client_id@client_secret
|
||||
| [Line](https://github.com/caronc/apprise/wiki/Notify_line) | line:// | (TCP) 443 | line://Token@User<br/>line://Token/User1/User2/UserN
|
||||
| [Mailgun](https://github.com/caronc/apprise/wiki/Notify_mailgun) | mailgun:// | (TCP) 443 | mailgun://user@hostname/apikey<br />mailgun://user@hostname/apikey/email<br />mailgun://user@hostname/apikey/email1/email2/emailN<br />mailgun://user@hostname/apikey/?name="From%20User"
|
||||
| [Mastodon](https://github.com/caronc/apprise/wiki/Notify_mastodon) | mastodon:// or mastodons://| (TCP) 80 or 443 | mastodon://access_key@hostname<br />mastodon://access_key@hostname/@user<br />mastodon://access_key@hostname/@user1/@user2/@userN
|
||||
| [Matrix](https://github.com/caronc/apprise/wiki/Notify_matrix) | matrix:// or matrixs:// | (TCP) 80 or 443 | matrix://hostname<br />matrix://user@hostname<br />matrixs://user:pass@hostname:port/#room_alias<br />matrixs://user:pass@hostname:port/!room_id<br />matrixs://user:pass@hostname:port/#room_alias/!room_id/#room2<br />matrixs://token@hostname:port/?webhook=matrix<br />matrix://user:token@hostname/?webhook=slack&format=markdown
|
||||
| [Mattermost](https://github.com/caronc/apprise/wiki/Notify_mattermost) | mmost:// or mmosts:// | (TCP) 8065 | mmost://hostname/authkey<br />mmost://hostname:80/authkey<br />mmost://user@hostname:80/authkey<br />mmost://hostname/authkey?channel=channel<br />mmosts://hostname/authkey<br />mmosts://user@hostname/authkey<br />
|
||||
| [Microsoft Teams](https://github.com/caronc/apprise/wiki/Notify_msteams) | msteams:// | (TCP) 443 | msteams://TokenA/TokenB/TokenC/
|
||||
|
@ -170,6 +170,11 @@ class AppriseAttachment:
|
||||
return_status = False
|
||||
continue
|
||||
|
||||
elif isinstance(_attachment, AppriseAttachment):
|
||||
# We were provided a list of Apprise Attachments
|
||||
# append our content together
|
||||
instance = _attachment.attachments
|
||||
|
||||
elif not isinstance(_attachment, attachment.AttachBase):
|
||||
logger.warning(
|
||||
"An invalid attachment (type={}) was specified.".format(
|
||||
@ -196,7 +201,11 @@ class AppriseAttachment:
|
||||
continue
|
||||
|
||||
# Add our initialized plugin to our server listings
|
||||
self.attachments.append(instance)
|
||||
if isinstance(instance, list):
|
||||
self.attachments.extend(instance)
|
||||
|
||||
else:
|
||||
self.attachments.append(instance)
|
||||
|
||||
# Return our status
|
||||
return return_status
|
||||
|
976
apprise/plugins/NotifyMastodon.py
Normal file
976
apprise/plugins/NotifyMastodon.py
Normal file
@ -0,0 +1,976 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (C) 2022 Chris Caron <lead2gold@gmail.com>
|
||||
# All rights reserved.
|
||||
#
|
||||
# This code is licensed under the MIT License.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files(the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions :
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in
|
||||
# all copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
# THE SOFTWARE.
|
||||
|
||||
import re
|
||||
import requests
|
||||
from copy import deepcopy
|
||||
from json import dumps, loads
|
||||
from datetime import datetime
|
||||
|
||||
from .NotifyBase import NotifyBase
|
||||
from ..URLBase import PrivacyMode
|
||||
from ..common import NotifyImageSize
|
||||
from ..common import NotifyFormat
|
||||
from ..common import NotifyType
|
||||
from ..utils import parse_list
|
||||
from ..utils import parse_bool
|
||||
from ..utils import validate_regex
|
||||
from ..AppriseLocale import gettext_lazy as _
|
||||
from ..attachment.AttachBase import AttachBase
|
||||
|
||||
# Accept:
|
||||
# - @username
|
||||
# - username
|
||||
# - username@host.com
|
||||
# - @username@host.com
|
||||
IS_USER = re.compile(
|
||||
r'^\s*@?(?P<user>[A-Z0-9_]+(?:@(?P<host>[A-Z0-9_.-]+))?)$', re.I)
|
||||
|
||||
USER_DETECTION_RE = re.compile(
|
||||
r'(@[A-Z0-9_]+(?:@[A-Z0-9_.-]+)?)(?=$|[\s,.&()\[\]]+)', re.I)
|
||||
|
||||
|
||||
class MastodonMessageVisibility:
|
||||
"""
|
||||
The visibility of any status message made
|
||||
"""
|
||||
# post visibility defaults to the accounts default-visibilty setting
|
||||
DEFAULT = 'default'
|
||||
|
||||
# post will be visible only to mentioned users
|
||||
# similar to a Twitter DM
|
||||
DIRECT = 'direct'
|
||||
|
||||
# post will be visible only to followers
|
||||
PRIVATE = 'private'
|
||||
|
||||
# post will be public but not appear on the public timeline
|
||||
UNLISTED = 'unlisted'
|
||||
|
||||
# post will be public
|
||||
PUBLIC = 'public'
|
||||
|
||||
|
||||
# Define the types in a list for validation purposes
|
||||
MASTODON_MESSAGE_VISIBILITIES = (
|
||||
MastodonMessageVisibility.DEFAULT,
|
||||
MastodonMessageVisibility.DIRECT,
|
||||
MastodonMessageVisibility.PRIVATE,
|
||||
MastodonMessageVisibility.UNLISTED,
|
||||
MastodonMessageVisibility.PUBLIC,
|
||||
)
|
||||
|
||||
|
||||
class NotifyMastodon(NotifyBase):
|
||||
"""
|
||||
A wrapper for Notify Mastodon Notifications
|
||||
"""
|
||||
|
||||
# The default descriptive name associated with the Notification
|
||||
service_name = 'Mastodon'
|
||||
|
||||
# The services URL
|
||||
service_url = 'https://joinmastodon.org'
|
||||
|
||||
# The default protocol
|
||||
protocol = ('mastodon', 'toot')
|
||||
|
||||
# The default secure protocol
|
||||
secure_protocol = ('mastodons', 'toots')
|
||||
|
||||
# A URL that takes you to the setup/help of the specific protocol
|
||||
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_mastodon'
|
||||
|
||||
# Allows the user to specify the NotifyImageSize object; this is supported
|
||||
# through the webhook
|
||||
image_size = NotifyImageSize.XY_128
|
||||
|
||||
# it is documented on the site that the maximum images per toot
|
||||
# is 4 (unless it's a GIF, then it's only 1)
|
||||
__toot_non_gif_images_batch = 4
|
||||
|
||||
# Mastodon API Reference To Acquire Current Users Information
|
||||
# See: https://docs.joinmastodon.org/methods/accounts/
|
||||
# Requires Scope Element: read:accounts
|
||||
mastodon_whoami = '/api/v1/accounts/verify_credentials'
|
||||
|
||||
# URL for posting media files
|
||||
mastodon_media = '/api/v1/media'
|
||||
|
||||
# URL for posting status messages
|
||||
mastodon_toot = '/api/v1/statuses'
|
||||
|
||||
# URL for posting direct messages
|
||||
mastodon_dm = '/api/v1/dm'
|
||||
|
||||
# The title is not used
|
||||
title_maxlen = 0
|
||||
|
||||
# The maximum size of the message
|
||||
body_maxlen = 500
|
||||
|
||||
# Default to text
|
||||
notify_format = NotifyFormat.TEXT
|
||||
|
||||
# Mastodon is kind enough to return how many more requests we're allowed to
|
||||
# continue to make within it's header response as:
|
||||
# X-Rate-Limit-Reset: The epoc time (in seconds) we can expect our
|
||||
# rate-limit to be reset.
|
||||
# X-Rate-Limit-Remaining: an integer identifying how many requests we're
|
||||
# still allow to make.
|
||||
request_rate_per_sec = 0
|
||||
|
||||
# For Tracking Purposes
|
||||
ratelimit_reset = datetime.utcnow()
|
||||
|
||||
# Default to 1000; users can send up to 1000 DM's and 2400 toot a day
|
||||
# This value only get's adjusted if the server sets it that way
|
||||
ratelimit_remaining = 1
|
||||
|
||||
# Define object templates
|
||||
templates = (
|
||||
'{schema}://{token}@{host}',
|
||||
'{schema}://{token}@{host}:{port}',
|
||||
'{schema}://{token}@{host}/{targets}',
|
||||
'{schema}://{token}@{host}:{port}/{targets}',
|
||||
)
|
||||
|
||||
# Define our template arguments
|
||||
template_tokens = dict(NotifyBase.template_tokens, **{
|
||||
'host': {
|
||||
'name': _('Hostname'),
|
||||
'type': 'string',
|
||||
'required': True,
|
||||
},
|
||||
'token': {
|
||||
'name': _('Access Token'),
|
||||
'type': 'string',
|
||||
'required': True,
|
||||
},
|
||||
'port': {
|
||||
'name': _('Port'),
|
||||
'type': 'int',
|
||||
'min': 1,
|
||||
'max': 65535,
|
||||
},
|
||||
'target_user': {
|
||||
'name': _('Target User'),
|
||||
'type': 'string',
|
||||
'prefix': '@',
|
||||
'map_to': 'targets',
|
||||
},
|
||||
'targets': {
|
||||
'name': _('Targets'),
|
||||
'type': 'list:string',
|
||||
},
|
||||
})
|
||||
|
||||
# Define our template arguments
|
||||
template_args = dict(NotifyBase.template_args, **{
|
||||
'token': {
|
||||
'alias_of': 'token',
|
||||
},
|
||||
'visibility': {
|
||||
'name': _('Visibility'),
|
||||
'type': 'choice:string',
|
||||
'values': MASTODON_MESSAGE_VISIBILITIES,
|
||||
'default': MastodonMessageVisibility.DEFAULT,
|
||||
},
|
||||
'cache': {
|
||||
'name': _('Cache Results'),
|
||||
'type': 'bool',
|
||||
'default': True,
|
||||
},
|
||||
'batch': {
|
||||
'name': _('Batch Mode'),
|
||||
'type': 'bool',
|
||||
'default': True,
|
||||
},
|
||||
'sensitive': {
|
||||
'name': _('Sensitive Attachments'),
|
||||
'type': 'bool',
|
||||
'default': False,
|
||||
},
|
||||
'spoiler': {
|
||||
'name': _('Spoiler Text'),
|
||||
'type': 'string',
|
||||
},
|
||||
'key': {
|
||||
'name': _('Idempotency-Key'),
|
||||
'type': 'string',
|
||||
},
|
||||
'language': {
|
||||
'name': _('Language Code'),
|
||||
'type': 'string',
|
||||
},
|
||||
'to': {
|
||||
'alias_of': 'targets',
|
||||
},
|
||||
})
|
||||
|
||||
def __init__(self, token=None, targets=None, batch=True,
|
||||
sensitive=None, spoiler=None, visibility=None, cache=True,
|
||||
key=None, language=None, **kwargs):
|
||||
"""
|
||||
Initialize Notify Mastodon Object
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
|
||||
# Set our schema
|
||||
self.schema = 'https' if self.secure else 'http'
|
||||
|
||||
# Initialize our cache value
|
||||
self._whoami_cache = None
|
||||
|
||||
self.token = validate_regex(token)
|
||||
if not self.token:
|
||||
msg = 'An invalid Twitter Consumer Key was specified.'
|
||||
self.logger.warning(msg)
|
||||
raise TypeError(msg)
|
||||
|
||||
if visibility:
|
||||
# Input is a string; attempt to get the lookup from our
|
||||
# sound mapping
|
||||
vis = 'invalid' if not isinstance(visibility, str) \
|
||||
else visibility.lower().strip()
|
||||
|
||||
# This little bit of black magic allows us to match against
|
||||
# against multiple versions of the same string
|
||||
# ... etc
|
||||
self.visibility = \
|
||||
next((v for v in MASTODON_MESSAGE_VISIBILITIES
|
||||
if v.startswith(vis)), None)
|
||||
|
||||
if self.visibility not in MASTODON_MESSAGE_VISIBILITIES:
|
||||
msg = 'The Mastodon visibility specified ({}) is invalid.' \
|
||||
.format(visibility)
|
||||
self.logger.warning(msg)
|
||||
raise TypeError(msg)
|
||||
|
||||
else:
|
||||
self.visibility = \
|
||||
self.template_args['visibility']['default']
|
||||
|
||||
# Prepare our URL
|
||||
self.api_url = '%s://%s' % (self.schema, self.host)
|
||||
|
||||
if isinstance(self.port, int):
|
||||
self.api_url += ':%d' % self.port
|
||||
|
||||
# Set Cache Flag
|
||||
self.cache = cache
|
||||
|
||||
# Prepare Image Batch Mode Flag
|
||||
self.batch = self.template_args['batch']['default'] \
|
||||
if batch is None else batch
|
||||
|
||||
# Images to be marked sensitive
|
||||
self.sensitive = self.template_args['sensitive']['default'] \
|
||||
if sensitive is None else sensitive
|
||||
|
||||
# Text marked as being a spoiler
|
||||
self.spoiler = spoiler if isinstance(spoiler, str) else None
|
||||
|
||||
# Idempotency Key
|
||||
self.idempotency_key = key if isinstance(key, str) else None
|
||||
|
||||
# Over-ride default language (ISO 639) (e.g: en, fr, es, etc)
|
||||
self.language = language if isinstance(language, str) else None
|
||||
|
||||
# Our target users
|
||||
self.targets = []
|
||||
|
||||
# Track any errors
|
||||
has_error = False
|
||||
|
||||
# Identify our targets
|
||||
for target in parse_list(targets):
|
||||
match = IS_USER.match(target)
|
||||
if match and match.group('user'):
|
||||
self.targets.append('@' + match.group('user'))
|
||||
continue
|
||||
|
||||
has_error = True
|
||||
self.logger.warning(
|
||||
'Dropped invalid Mastodon user ({}) specified.'.format(target),
|
||||
)
|
||||
|
||||
if has_error and not self.targets:
|
||||
# We have specified that we want to notify one or more individual
|
||||
# and we failed to load any of them. Since it's also valid to
|
||||
# notify no one at all (which means we notify ourselves), it's
|
||||
# important we don't switch from the users original intentions
|
||||
msg = 'No Mastodon targets to notify.'
|
||||
self.logger.warning(msg)
|
||||
raise TypeError(msg)
|
||||
|
||||
return
|
||||
|
||||
def url(self, privacy=False, *args, **kwargs):
|
||||
"""
|
||||
Returns the URL built dynamically based on specified arguments.
|
||||
"""
|
||||
|
||||
# Define any URL parameters
|
||||
params = {
|
||||
'visibility': self.visibility,
|
||||
'batch': 'yes' if self.batch else 'no',
|
||||
'sensitive': 'yes' if self.sensitive else 'no',
|
||||
}
|
||||
|
||||
# Extend our parameters
|
||||
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
|
||||
|
||||
if self.spoiler:
|
||||
# Our Spoiler if one was specified
|
||||
params['spoiler'] = self.spoiler
|
||||
|
||||
if self.idempotency_key:
|
||||
# Our Idempotency Key
|
||||
params['key'] = self.idempotency_key
|
||||
|
||||
if self.language:
|
||||
# Override Language
|
||||
params['language'] = self.language
|
||||
|
||||
default_port = 443 if self.secure else 80
|
||||
|
||||
return '{schema}://{token}@{host}{port}/{targets}?{params}'.format(
|
||||
schema=self.secure_protocol[0]
|
||||
if self.secure else self.protocol[0],
|
||||
token=self.pprint(
|
||||
self.token, privacy, mode=PrivacyMode.Secret, safe=''),
|
||||
# never encode hostname since we're expecting it to be a valid one
|
||||
host=self.host,
|
||||
port='' if self.port is None or self.port == default_port
|
||||
else ':{}'.format(self.port),
|
||||
targets='/'.join(
|
||||
[NotifyMastodon.quote(x, safe='@') for x in self.targets]),
|
||||
params=NotifyMastodon.urlencode(params),
|
||||
)
|
||||
|
||||
def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
|
||||
**kwargs):
|
||||
"""
|
||||
wrapper to _send since we can alert more then one channel
|
||||
"""
|
||||
|
||||
# Build a list of our attachments
|
||||
attachments = []
|
||||
|
||||
# Smart Target Detection for Direct Messages; this prevents us from
|
||||
# adding @user entries that were already placed in the message body
|
||||
users = set(USER_DETECTION_RE.findall(body))
|
||||
targets = users - set(self.targets.copy())
|
||||
if not self.targets and self.visibility == \
|
||||
MastodonMessageVisibility.DIRECT:
|
||||
|
||||
result = self._whoami()
|
||||
if not result:
|
||||
# Could not access our status
|
||||
return False
|
||||
|
||||
myself = '@' + next(iter(result.keys()))
|
||||
if myself in users:
|
||||
targets.remove(myself)
|
||||
|
||||
else:
|
||||
targets.add(myself)
|
||||
|
||||
if attach:
|
||||
# We need to upload our payload first so that we can source it
|
||||
# in remaining messages
|
||||
for attachment in attach:
|
||||
|
||||
# Perform some simple error checking
|
||||
if not attachment:
|
||||
# We could not access the attachment
|
||||
self.logger.error(
|
||||
'Could not access attachment {}.'.format(
|
||||
attachment.url(privacy=True)))
|
||||
return False
|
||||
|
||||
#
|
||||
# Images (PNG, JPG, GIF) up to 8MB.
|
||||
# - Images will be downscaled to 1.6 megapixels (enough for a
|
||||
# 1280x1280 image).
|
||||
# - Up to 4 images can be attached.
|
||||
# - Animated GIFs are converted to soundless MP4s like on
|
||||
# Imgur/Gfycat (GIFV).
|
||||
# - You can also upload soundless MP4 and WebM, which will
|
||||
# be handled the same way.
|
||||
# Videos (MP4, M4V, MOV, WebM) up to 40MB.
|
||||
# - Video will be transcoded to H.264 MP4 with a maximum
|
||||
# bitrate of 1300kbps and framerate of 60fps.
|
||||
# Audio (MP3, OGG, WAV, FLAC, OPUS, AAC, M4A, 3GP) up to 40MB.
|
||||
# - Audio will be transcoded to MP3 using V2 VBR (roughly
|
||||
# 192kbps).
|
||||
if not re.match(r'^(image|video|audio)/.*',
|
||||
attachment.mimetype, re.I):
|
||||
# Only support images at this time
|
||||
self.logger.warning(
|
||||
'Ignoring unsupported Mastodon attachment {}.'.format(
|
||||
attachment.url(privacy=True)))
|
||||
continue
|
||||
|
||||
self.logger.debug(
|
||||
'Preparing Mastodon attachment {}'.format(
|
||||
attachment.url(privacy=True)))
|
||||
|
||||
# Upload our image and get our id associated with it
|
||||
postokay, response = self._request(
|
||||
self.mastodon_media,
|
||||
payload=attachment,
|
||||
)
|
||||
|
||||
if not postokay:
|
||||
# We can't post our attachment
|
||||
if response and 'authorized scopes' \
|
||||
in response.get('error', ''):
|
||||
self.logger.warning(
|
||||
'Failed to Send Attachment to Mastodon: '
|
||||
'missing scope: write:media')
|
||||
|
||||
# All other failures should cause us to abort
|
||||
return False
|
||||
|
||||
if not (isinstance(response, dict)
|
||||
and response.get('id')):
|
||||
self.logger.debug(
|
||||
'Could not attach the file to Mastodon: %s (mime=%s)',
|
||||
attachment.name, attachment.mimetype)
|
||||
continue
|
||||
|
||||
# If we get here, our output will look something like this:
|
||||
# {
|
||||
# 'id': '12345',
|
||||
# 'type': 'image',
|
||||
# 'url': 'https://.../6dad4663a.jpeg',
|
||||
# 'preview_url': 'https://.../adde6dad4663a.jpeg',
|
||||
# 'remote_url': None,
|
||||
# 'preview_remote_url': None,
|
||||
# 'text_url': None,
|
||||
# 'meta': {
|
||||
# 'original': {
|
||||
# 'width': 640,
|
||||
# 'height': 640,
|
||||
# 'size': '640x640',
|
||||
# 'aspect': 1.0
|
||||
# },
|
||||
# 'small': {
|
||||
# 'width': 400,
|
||||
# 'height': 400,
|
||||
# 'size': '400x400',
|
||||
# 'aspect': 1.0
|
||||
# }
|
||||
# },
|
||||
# 'description': None,
|
||||
# 'blurhash': 'UmIsdJnT^mX4V@XQofnQ~Ebq%4o3ofnQjZbt'
|
||||
# }
|
||||
response.update({
|
||||
# Update our response to additionally include the
|
||||
# attachment details
|
||||
'file_name': attachment.name,
|
||||
'file_mime': attachment.mimetype,
|
||||
'file_path': attachment.path,
|
||||
})
|
||||
|
||||
# Save our pre-prepared payload for attachment posting
|
||||
attachments.append(response)
|
||||
|
||||
payload = {
|
||||
'status': '{} {}'.format(' '.join(targets), body)
|
||||
if targets else body,
|
||||
'sensitive': self.sensitive,
|
||||
}
|
||||
|
||||
# Handle Visibility Flag
|
||||
if self.visibility != MastodonMessageVisibility.DEFAULT:
|
||||
payload['visibility'] = self.visibility
|
||||
|
||||
# Set Spoiler text (if set)
|
||||
if self.spoiler:
|
||||
payload['spoiler_text'] = self.spoiler
|
||||
|
||||
# Set Idempotency-Key (if set)
|
||||
if self.idempotency_key:
|
||||
payload['Idempotency-Key'] = self.idempotency_key
|
||||
|
||||
# Set Language
|
||||
if self.language:
|
||||
payload['language'] = self.language
|
||||
|
||||
payloads = []
|
||||
if not attachments:
|
||||
payloads.append(payload)
|
||||
|
||||
else:
|
||||
# Group our images if batch is set to do so
|
||||
batch_size = 1 if not self.batch \
|
||||
else self.__toot_non_gif_images_batch
|
||||
|
||||
# Track our batch control in our message generation
|
||||
batches = []
|
||||
batch = []
|
||||
for attachment in attachments:
|
||||
batch.append(attachment['id'])
|
||||
# Mastodon supports batching images together. This allows
|
||||
# the batching of multiple images together. Mastodon also
|
||||
# makes it clear that you can't batch `gif` files; they need
|
||||
# to be separate. So the below preserves the ordering that
|
||||
# a user passed their attachments in. if 4-non-gif images
|
||||
# are passed, they are all part of a single message.
|
||||
#
|
||||
# however, if they pass in image, gif, image, gif. The
|
||||
# gif's inbetween break apart the batches so this would
|
||||
# produce 4 separate toots.
|
||||
#
|
||||
# If you passed in, image, image, gif, image. <- This would
|
||||
# produce 3 images (as the first 2 images could be lumped
|
||||
# together as a batch)
|
||||
if not re.match(
|
||||
r'^image/(png|jpe?g)', attachment['file_mime'], re.I) \
|
||||
or len(batch) >= batch_size:
|
||||
batches.append(batch)
|
||||
batch = []
|
||||
|
||||
if batch:
|
||||
batches.append(batch)
|
||||
|
||||
for no, media_ids in enumerate(batches):
|
||||
_payload = deepcopy(payload)
|
||||
_payload['media_ids'] = media_ids
|
||||
|
||||
if no:
|
||||
# strip text and replace it with the image representation
|
||||
_payload['status'] = \
|
||||
'{:02d}/{:02d}'.format(no + 1, len(batches))
|
||||
# No longer sensitive information
|
||||
_payload['sensitive'] = False
|
||||
if self.idempotency_key:
|
||||
# Support multiposts while a Idempotency Key has been
|
||||
# defined
|
||||
_payload['Idempotency-Key'] = '{}-part{:02d}'.format(
|
||||
self.idempotency_key, no)
|
||||
payloads.append(_payload)
|
||||
|
||||
# Error Tracking
|
||||
has_error = False
|
||||
|
||||
for no, payload in enumerate(payloads, start=1):
|
||||
postokay, response = self._request(self.mastodon_toot, payload)
|
||||
if not postokay:
|
||||
# Track our error
|
||||
has_error = True
|
||||
|
||||
# We can't post our attachment
|
||||
if response and 'authorized scopes' \
|
||||
in response.get('error', ''):
|
||||
self.logger.warning(
|
||||
'Failed to Send Status to Mastodon: '
|
||||
'missing scope: write:statuses')
|
||||
|
||||
continue
|
||||
|
||||
# Example Attachment Output:
|
||||
# {
|
||||
# "id":"109315796435904505",
|
||||
# "created_at":"2022-11-09T20:44:39.017Z",
|
||||
# "in_reply_to_id":null,
|
||||
# "in_reply_to_account_id":null,
|
||||
# "sensitive":false,
|
||||
# "spoiler_text":"",
|
||||
# "visibility":"public",
|
||||
# "language":"en",
|
||||
# "uri":"https://host/users/caronc/statuses/109315796435904505",
|
||||
# "url":"https://host/@caronc/109315796435904505",
|
||||
# "replies_count":0,
|
||||
# "reblogs_count":0,
|
||||
# "favourites_count":0,
|
||||
# "edited_at":null,
|
||||
# "favourited":false,
|
||||
# "reblogged":false,
|
||||
# "muted":false,
|
||||
# "bookmarked":false,
|
||||
# "pinned":false,
|
||||
# "content":"<p>test</p>",
|
||||
# "reblog":null,
|
||||
# "application":{
|
||||
# "name":"Apprise Notifications",
|
||||
# "website":"https://github.com/caronc/apprise"
|
||||
# },
|
||||
# "account":{
|
||||
# "id":"109310334138718878",
|
||||
# "username":"caronc",
|
||||
# "acct":"caronc",
|
||||
# "display_name":"Chris",
|
||||
# "locked":false,
|
||||
# "bot":false,
|
||||
# "discoverable":false,
|
||||
# "group":false,
|
||||
# "created_at":"2022-11-08T00:00:00.000Z",
|
||||
# "note":"content",
|
||||
# "url":"https://host/@caronc",
|
||||
# "avatar":"https://host/path/file.png",
|
||||
# "avatar_static":"https://host/path/file.png",
|
||||
# "header":"https://host/headers/original/missing.png",
|
||||
# "header_static":"https://host/path/missing.png",
|
||||
# "followers_count":0,
|
||||
# "following_count":0,
|
||||
# "statuses_count":15,
|
||||
# "last_status_at":"2022-11-09",
|
||||
# "emojis":[
|
||||
#
|
||||
# ],
|
||||
# "fields":[
|
||||
#
|
||||
# ]
|
||||
# },
|
||||
# "media_attachments":[
|
||||
# {
|
||||
# "id":"109315796405707501",
|
||||
# "type":"image",
|
||||
# "url":"https://host/path/file.jpeg",
|
||||
# "preview_url":"https://host/path/file.jpeg",
|
||||
# "remote_url":null,
|
||||
# "preview_remote_url":null,
|
||||
# "text_url":null,
|
||||
# "meta":{
|
||||
# "original":{
|
||||
# "width":640,
|
||||
# "height":640,
|
||||
# "size":"640x640",
|
||||
# "aspect":1.0
|
||||
# },
|
||||
# "small":{
|
||||
# "width":400,
|
||||
# "height":400,
|
||||
# "size":"400x400",
|
||||
# "aspect":1.0
|
||||
# }
|
||||
# },
|
||||
# "description":null,
|
||||
# "blurhash":"UmIsdJnT^mX4V@XQofnQ~Ebq%4o3ofnQjZbt"
|
||||
# }
|
||||
# ],
|
||||
# "mentions":[
|
||||
#
|
||||
# ],
|
||||
# "tags":[
|
||||
#
|
||||
# ],
|
||||
# "emojis":[
|
||||
#
|
||||
# ],
|
||||
# "card":null,
|
||||
# "poll":null
|
||||
# }
|
||||
|
||||
try:
|
||||
url = '{}/web/@{}'.format(
|
||||
self.api_url,
|
||||
response['account']['username'])
|
||||
|
||||
except (KeyError, TypeError):
|
||||
url = 'unknown'
|
||||
|
||||
self.logger.debug(
|
||||
'Mastodon [%.2d/%.2d] (%d attached) delivered to %s',
|
||||
no, len(payloads), len(payload.get('media_ids', [])), url)
|
||||
|
||||
self.logger.info(
|
||||
'Sent [%.2d/%.2d] Mastodon notification as public toot.',
|
||||
no, len(payloads))
|
||||
|
||||
return not has_error
|
||||
|
||||
def _whoami(self, lazy=True):
|
||||
"""
|
||||
Looks details of current authenticated user
|
||||
|
||||
"""
|
||||
|
||||
if lazy and self._whoami_cache is not None:
|
||||
# Use cached response
|
||||
return self._whoami_cache
|
||||
|
||||
# Send Mastodon Whoami request
|
||||
postokay, response = self._request(
|
||||
self.mastodon_whoami,
|
||||
method='GET',
|
||||
)
|
||||
|
||||
if postokay:
|
||||
# Sample Response:
|
||||
# {
|
||||
# 'id': '12345',
|
||||
# 'username': 'caronc',
|
||||
# 'acct': 'caronc',
|
||||
# 'display_name': 'Chris',
|
||||
# 'locked': False,
|
||||
# 'bot': False,
|
||||
# 'discoverable': False,
|
||||
# 'group': False,
|
||||
# 'created_at': '2022-11-08T00:00:00.000Z',
|
||||
# 'note': 'details',
|
||||
# 'url': 'https://noc.social/@caronc',
|
||||
# 'avatar': 'https://host/path/image.png',
|
||||
# 'avatar_static': 'https://host/path/image.png',
|
||||
# 'header': 'https://host/path/missing.png',
|
||||
# 'header_static': 'https://host/path/missing.png',
|
||||
# 'followers_count': 0,
|
||||
# 'following_count': 0,
|
||||
# 'statuses_count': 2,
|
||||
# 'last_status_at': '2022-11-09',
|
||||
# 'source': {
|
||||
# 'privacy': 'public',
|
||||
# 'sensitive': False,
|
||||
# 'language': None,
|
||||
# 'note': 'details',
|
||||
# 'fields': [],
|
||||
# 'follow_requests_count': 0
|
||||
# },
|
||||
# 'emojis': [],
|
||||
# 'fields': []
|
||||
# }
|
||||
try:
|
||||
# Cache our response for future references
|
||||
self._whoami_cache = {
|
||||
response['username']: response['id']}
|
||||
|
||||
except (TypeError, KeyError):
|
||||
pass
|
||||
|
||||
elif response and 'authorized scopes' in response.get('error', ''):
|
||||
self.logger.warning(
|
||||
'Failed to lookup Mastodon Auth details; '
|
||||
'missing scope: read:accounts')
|
||||
|
||||
return self._whoami_cache if postokay else {}
|
||||
|
||||
def _request(self, path, payload=None, method='POST'):
|
||||
"""
|
||||
Wrapper to Mastodon API requests object
|
||||
"""
|
||||
|
||||
headers = {
|
||||
'User-Agent': self.app_id,
|
||||
'Authorization': f'Bearer {self.token}',
|
||||
}
|
||||
|
||||
data = None
|
||||
files = None
|
||||
|
||||
# Prepare our message
|
||||
url = '{}{}'.format(self.api_url, path)
|
||||
|
||||
# Some Debug Logging
|
||||
self.logger.debug('Mastodon {} URL: {} (cert_verify={})'.format(
|
||||
method, url, self.verify_certificate))
|
||||
|
||||
# Open our attachment path if required:
|
||||
if isinstance(payload, AttachBase):
|
||||
# prepare payload
|
||||
files = {
|
||||
'file': (payload.name, open(payload.path, 'rb'),
|
||||
'application/octet-stream')}
|
||||
|
||||
# Provide a description
|
||||
data = {
|
||||
'description': payload.name,
|
||||
}
|
||||
|
||||
else:
|
||||
headers['Content-Type'] = 'application/json'
|
||||
data = dumps(payload)
|
||||
self.logger.debug('Mastodon Payload: %s' % str(payload))
|
||||
|
||||
# Default content response object
|
||||
content = {}
|
||||
|
||||
# By default set wait to None
|
||||
wait = None
|
||||
|
||||
if self.ratelimit_remaining == 0:
|
||||
# Determine how long we should wait for or if we should wait at
|
||||
# all. This isn't fool-proof because we can't be sure the client
|
||||
# time (calling this script) is completely synced up with the
|
||||
# Mastodon server. One would hope we're on NTP and our clocks are
|
||||
# the same allowing this to role smoothly:
|
||||
|
||||
now = datetime.utcnow()
|
||||
if now < self.ratelimit_reset:
|
||||
# We need to throttle for the difference in seconds
|
||||
# We add 0.5 seconds to the end just to allow a grace
|
||||
# period.
|
||||
wait = (self.ratelimit_reset - now).total_seconds() + 0.5
|
||||
|
||||
# Always call throttle before any remote server i/o is made;
|
||||
self.throttle(wait=wait)
|
||||
|
||||
# acquire our request mode
|
||||
fn = requests.post if method == 'POST' else requests.get
|
||||
|
||||
try:
|
||||
r = fn(
|
||||
url,
|
||||
data=data,
|
||||
files=files,
|
||||
headers=headers,
|
||||
verify=self.verify_certificate,
|
||||
timeout=self.request_timeout,
|
||||
)
|
||||
|
||||
try:
|
||||
content = loads(r.content)
|
||||
|
||||
except (AttributeError, TypeError, ValueError):
|
||||
# ValueError = r.content is Unparsable
|
||||
# TypeError = r.content is None
|
||||
# AttributeError = r is None
|
||||
content = {}
|
||||
|
||||
if r.status_code not in (
|
||||
requests.codes.ok, requests.codes.created,
|
||||
requests.codes.accepted):
|
||||
|
||||
# We had a problem
|
||||
status_str = \
|
||||
NotifyMastodon.http_response_code_lookup(r.status_code)
|
||||
|
||||
self.logger.warning(
|
||||
'Failed to send Mastodon {} to {}: '
|
||||
'{}error={}.'.format(
|
||||
method,
|
||||
url,
|
||||
', ' if status_str else '',
|
||||
r.status_code))
|
||||
|
||||
self.logger.debug(
|
||||
'Response Details:\r\n{}'.format(r.content))
|
||||
|
||||
# Mark our failure
|
||||
return (False, content)
|
||||
|
||||
try:
|
||||
# Capture rate limiting if possible
|
||||
self.ratelimit_remaining = \
|
||||
int(r.headers.get('X-RateLimit-Remaining'))
|
||||
self.ratelimit_reset = datetime.utcfromtimestamp(
|
||||
int(r.headers.get('X-RateLimit-Limit')))
|
||||
|
||||
except (TypeError, ValueError):
|
||||
# This is returned if we could not retrieve this information
|
||||
# gracefully accept this state and move on
|
||||
pass
|
||||
|
||||
except requests.RequestException as e:
|
||||
self.logger.warning(
|
||||
'Exception received when sending Mastodon {} to {}: '.
|
||||
format(method, url))
|
||||
self.logger.debug('Socket Exception: %s' % str(e))
|
||||
|
||||
# Mark our failure
|
||||
return (False, content)
|
||||
|
||||
except (OSError, IOError) as e:
|
||||
self.logger.warning(
|
||||
'An I/O error occurred while handling {}.'.format(
|
||||
payload.name if isinstance(payload, AttachBase)
|
||||
else payload))
|
||||
self.logger.debug('I/O Exception: %s' % str(e))
|
||||
return (False, content)
|
||||
|
||||
finally:
|
||||
# Close our file (if it's open) stored in the second element
|
||||
# of our files tuple (index 1)
|
||||
if files:
|
||||
files['file'][1].close()
|
||||
|
||||
return (True, content)
|
||||
|
||||
@staticmethod
|
||||
def parse_url(url):
|
||||
"""
|
||||
Parses the URL and returns enough arguments that can allow
|
||||
us to re-instantiate this object.
|
||||
|
||||
"""
|
||||
results = NotifyBase.parse_url(url)
|
||||
if not results:
|
||||
# We're done early as we couldn't load the results
|
||||
return results
|
||||
|
||||
if 'token' in results['qsd'] and len(results['qsd']['token']):
|
||||
results['token'] = NotifyMastodon.unquote(results['qsd']['token'])
|
||||
|
||||
elif not results['password'] and results['user']:
|
||||
results['token'] = NotifyMastodon.unquote(results['user'])
|
||||
|
||||
# Apply our targets
|
||||
results['targets'] = NotifyMastodon.split_path(results['fullpath'])
|
||||
|
||||
# The defined Mastodon visibility
|
||||
if 'visibility' in results['qsd'] and \
|
||||
len(results['qsd']['visibility']):
|
||||
# Simplified version
|
||||
results['visibility'] = \
|
||||
NotifyMastodon.unquote(results['qsd']['visibility'])
|
||||
|
||||
elif results['schema'].startswith('toot'):
|
||||
results['visibility'] = MastodonMessageVisibility.PUBLIC
|
||||
|
||||
# Get Idempotency Key (if specified)
|
||||
if 'key' in results['qsd'] and len(results['qsd']['key']):
|
||||
results['key'] = \
|
||||
NotifyMastodon.unquote(results['qsd']['key'])
|
||||
|
||||
# Get Spoiler Text
|
||||
if 'spoiler' in results['qsd'] and len(results['qsd']['spoiler']):
|
||||
results['spoiler'] = \
|
||||
NotifyMastodon.unquote(results['qsd']['spoiler'])
|
||||
|
||||
# Get Language (if specified)
|
||||
if 'language' in results['qsd'] and len(results['qsd']['language']):
|
||||
results['language'] = \
|
||||
NotifyMastodon.unquote(results['qsd']['language'])
|
||||
|
||||
# Get Sensitive Flag (for Attachments)
|
||||
results['sensitive'] = \
|
||||
parse_bool(results['qsd'].get(
|
||||
'sensitive',
|
||||
NotifyMastodon.template_args['sensitive']['default']))
|
||||
|
||||
# Get Batch Mode Flag
|
||||
results['batch'] = \
|
||||
parse_bool(results['qsd'].get(
|
||||
'batch', NotifyMastodon.template_args['batch']['default']))
|
||||
|
||||
# The 'to' makes it easier to use yaml configuration
|
||||
if 'to' in results['qsd'] and len(results['qsd']['to']):
|
||||
results['targets'] += \
|
||||
NotifyMastodon.parse_list(results['qsd']['to'])
|
||||
|
||||
return results
|
@ -39,10 +39,10 @@ Apprise API, AWS SES, AWS SNS, Bark, BulkSMS, Boxcar, ClickSend, DAPNET,
|
||||
DingTalk, Discord, E-Mail, Emby, Faast, FCM, Flock, Gitter, Google Chat,
|
||||
Gotify, Growl, Guilded, Home Assistant, IFTTT, Join, Kavenegar, KODI, Kumulos,
|
||||
LaMetric, Line, MacOSX, Mailgun, Mattermost, Matrix, Microsoft Windows,
|
||||
Microsoft Teams, MessageBird, MQTT, MSG91, MyAndroid, Nexmo, Nextcloud,
|
||||
NextcloudTalk, Notica, Notifico, ntfy, Office365, OneSignal, Opsgenie,
|
||||
PagerDuty, ParsePlatform, PopcornNotify, Prowl, Pushalot, PushBullet, Pushjet,
|
||||
Pushover, PushSafer, Reddit, Rocket.Chat, SendGrid, ServerChan, Signal,
|
||||
Mastodon, Microsoft Teams, MessageBird, MQTT, MSG91, MyAndroid, Nexmo,
|
||||
Nextcloud, NextcloudTalk, Notica, Notifico, ntfy, Office365, OneSignal,
|
||||
Opsgenie, PagerDuty, ParsePlatform, PopcornNotify, Prowl, Pushalot, PushBullet,
|
||||
Pushjet, Pushover, PushSafer, Reddit, Rocket.Chat, SendGrid, ServerChan, Signal,
|
||||
SimplePush, Sinch, Slack, SMSEagle, SMTP2Go, Spontit, SparkPost, Super Toasty,
|
||||
Streamlabs, Stride, Syslog, Techulus Push, Telegram, Twilio, Twitter, Twist,
|
||||
XBMC, Vonage, Webex Teams}
|
||||
|
747
test/test_plugin_mastodon.py
Normal file
747
test/test_plugin_mastodon.py
Normal file
@ -0,0 +1,747 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (C) 2022 Chris Caron <lead2gold@gmail.com>
|
||||
# All rights reserved.
|
||||
#
|
||||
# This code is licensed under the MIT License.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files(the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions :
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in
|
||||
# all copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
# THE SOFTWARE.
|
||||
|
||||
import os
|
||||
from unittest import mock
|
||||
|
||||
import requests
|
||||
from json import dumps, loads
|
||||
from datetime import datetime
|
||||
from apprise import Apprise
|
||||
from apprise import NotifyType
|
||||
from apprise import AppriseAttachment
|
||||
from apprise.plugins.NotifyMastodon import NotifyMastodon
|
||||
from helpers import AppriseURLTester
|
||||
|
||||
# Disable logging for a cleaner testing output
|
||||
import logging
|
||||
logging.disable(logging.CRITICAL)
|
||||
|
||||
# Attachment Directory
|
||||
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
|
||||
|
||||
# Our Testing URLs
|
||||
apprise_url_tests = (
|
||||
##################################
|
||||
# NotifyMastodon
|
||||
##################################
|
||||
('mastodon://', {
|
||||
# Missing Everything :)
|
||||
'instance': None,
|
||||
}),
|
||||
('mastodon://:@/', {
|
||||
'instance': None,
|
||||
}),
|
||||
('mastodon://hostname', {
|
||||
# Missing Access Token
|
||||
'instance': TypeError,
|
||||
}),
|
||||
('toot://access_token@hostname', {
|
||||
# We're good; it's a simple notification
|
||||
'instance': NotifyMastodon,
|
||||
}),
|
||||
('toots://access_token@hostname', {
|
||||
# We're good; it's another simple notification
|
||||
'instance': NotifyMastodon,
|
||||
# Our expected url(privacy=True) startswith() response:
|
||||
'privacy_url': 'mastodons://****@hostname/'
|
||||
}),
|
||||
('mastodon://access_token@hostname/@user/@user2', {
|
||||
# We're good; it's another simple notification
|
||||
'instance': NotifyMastodon,
|
||||
# Our expected url(privacy=True) startswith() response:
|
||||
'privacy_url': 'mastodon://****@hostname/@user/@user2'
|
||||
}),
|
||||
('mastodon://hostname/@user/@user2?token=abcd123', {
|
||||
# Our access token can be provided as a token= variable
|
||||
'instance': NotifyMastodon,
|
||||
# Our expected url(privacy=True) startswith() response:
|
||||
'privacy_url': 'mastodon://****@hostname/@user/@user2'
|
||||
}),
|
||||
('mastodon://access_token@hostname?to=@user, @user2', {
|
||||
# We're good; it's another simple notification
|
||||
'instance': NotifyMastodon,
|
||||
# Our expected url(privacy=True) startswith() response:
|
||||
'privacy_url': 'mastodon://****@hostname/@user/@user2'
|
||||
}),
|
||||
('mastodon://access_token@hostname/?cache=no', {
|
||||
# disable cache as a test
|
||||
'instance': NotifyMastodon,
|
||||
}),
|
||||
('mastodon://access_token@hostname/?spoiler=spoiler%20text', {
|
||||
# a public post
|
||||
'instance': NotifyMastodon,
|
||||
}),
|
||||
('mastodon://access_token@hostname/?language=en', {
|
||||
# over-ride our language
|
||||
'instance': NotifyMastodon,
|
||||
}),
|
||||
('mastodons://access_token@hostname:8443', {
|
||||
# A custom port specified
|
||||
'instance': NotifyMastodon,
|
||||
}),
|
||||
('mastodon://access_token@hostname/?key=My%20Idempotency%20Key', {
|
||||
# Prevent duplicate submissions of the same status. Idempotency
|
||||
# keys are stored for up to 1 hour, and can be any arbitrary string.
|
||||
# Consider using a hash or UUID generated client-side.
|
||||
'instance': NotifyMastodon,
|
||||
}),
|
||||
('mastodon://access_token@hostname/-/%/', {
|
||||
# Invalid users specified
|
||||
'instance': TypeError,
|
||||
}),
|
||||
('mastodon://access_token@hostname?visibility=invalid', {
|
||||
# An invalid visibility
|
||||
'instance': TypeError,
|
||||
}),
|
||||
('mastodon://access_token@hostname?visibility=direct', {
|
||||
# A direct message
|
||||
'instance': NotifyMastodon,
|
||||
# Expected notify() response False (because we won't
|
||||
# get the response we were expecting from the upstream
|
||||
# server
|
||||
'notify_response': False,
|
||||
}),
|
||||
('mastodon://access_token@hostname?visibility=direct', {
|
||||
# A direct message
|
||||
'instance': NotifyMastodon,
|
||||
# Provide a response that allows us to look our content up
|
||||
'requests_response_text': {
|
||||
'id': '12345',
|
||||
'username': 'test',
|
||||
},
|
||||
}),
|
||||
('toots://access_token@hostname', {
|
||||
'instance': NotifyMastodon,
|
||||
# Throws a series of connection and transfer exceptions when this flag
|
||||
# is set and tests that we gracfully handle them
|
||||
'test_requests_exceptions': True,
|
||||
}),
|
||||
('mastodons://access_token@hostname', {
|
||||
'instance': NotifyMastodon,
|
||||
# Throws a series of connection and transfer exceptions when this flag
|
||||
# is set and tests that we gracfully handle them
|
||||
'test_requests_exceptions': True,
|
||||
}),
|
||||
)
|
||||
|
||||
|
||||
def test_plugin_mastodon_urls():
|
||||
"""
|
||||
NotifyMastodon() Apprise URLs
|
||||
|
||||
"""
|
||||
|
||||
# Run our general tests
|
||||
AppriseURLTester(tests=apprise_url_tests).run_all()
|
||||
|
||||
|
||||
@mock.patch('requests.get')
|
||||
@mock.patch('requests.post')
|
||||
def test_plugin_mastodon_general(mock_post, mock_get):
|
||||
"""
|
||||
NotifyMastodon() General Tests
|
||||
|
||||
"""
|
||||
token = 'access_key'
|
||||
host = 'nuxref.com'
|
||||
|
||||
response_obj = {
|
||||
'username': 'caronc',
|
||||
'id': 1234,
|
||||
}
|
||||
|
||||
# Epoch time:
|
||||
epoch = datetime.utcfromtimestamp(0)
|
||||
|
||||
request = mock.Mock()
|
||||
request.content = dumps(response_obj)
|
||||
request.status_code = requests.codes.ok
|
||||
request.headers = {
|
||||
'X-RateLimit-Limit': (datetime.utcnow() - epoch).total_seconds(),
|
||||
'X-RateLimit-Remaining': 1,
|
||||
}
|
||||
|
||||
# Prepare Mock
|
||||
mock_get.return_value = request
|
||||
mock_post.return_value = request
|
||||
|
||||
# Instantiate our object
|
||||
obj = NotifyMastodon(token=token, host=host)
|
||||
|
||||
assert isinstance(obj, NotifyMastodon) is True
|
||||
assert isinstance(obj.url(), str) is True
|
||||
|
||||
# apprise room was found
|
||||
assert obj.send(body="test") is True
|
||||
|
||||
# Change our status code and try again
|
||||
request.status_code = 403
|
||||
assert obj.send(body="test") is False
|
||||
assert obj.ratelimit_remaining == 1
|
||||
|
||||
# Return the status
|
||||
request.status_code = requests.codes.ok
|
||||
# Force a reset
|
||||
request.headers['X-RateLimit-Remaining'] = 0
|
||||
# behind the scenes, it should cause us to update our rate limit
|
||||
assert obj.send(body="test") is True
|
||||
assert obj.ratelimit_remaining == 0
|
||||
|
||||
# This should cause us to block
|
||||
request.headers['X-RateLimit-Remaining'] = 10
|
||||
assert obj.send(body="test") is True
|
||||
assert obj.ratelimit_remaining == 10
|
||||
|
||||
# Handle cases where we simply couldn't get this field
|
||||
del request.headers['X-RateLimit-Remaining']
|
||||
assert obj.send(body="test") is True
|
||||
# It remains set to the last value
|
||||
assert obj.ratelimit_remaining == 10
|
||||
|
||||
# Reset our variable back to 1
|
||||
request.headers['X-RateLimit-Remaining'] = 1
|
||||
|
||||
# Handle cases where our epoch time is wrong
|
||||
del request.headers['X-RateLimit-Limit']
|
||||
assert obj.send(body="test") is True
|
||||
|
||||
# Return our object, but place it in the future forcing us to block
|
||||
request.headers['X-RateLimit-Limit'] = \
|
||||
(datetime.utcnow() - epoch).total_seconds() + 1
|
||||
request.headers['X-RateLimit-Remaining'] = 0
|
||||
obj.ratelimit_remaining = 0
|
||||
assert obj.send(body="test") is True
|
||||
|
||||
# Return our object, but place it in the future forcing us to block
|
||||
request.headers['X-RateLimit-Limit'] = \
|
||||
(datetime.utcnow() - epoch).total_seconds() - 1
|
||||
request.headers['X-RateLimit-Remaining'] = 0
|
||||
obj.ratelimit_remaining = 0
|
||||
assert obj.send(body="test") is True
|
||||
|
||||
# Return our limits to always work
|
||||
request.headers['X-RateLimit-Limit'] = \
|
||||
(datetime.utcnow() - epoch).total_seconds()
|
||||
request.headers['X-RateLimit-Remaining'] = 1
|
||||
obj.ratelimit_remaining = 1
|
||||
|
||||
# Alter pending targets
|
||||
obj.targets.append('usera')
|
||||
request.content = dumps(response_obj)
|
||||
response_obj = {
|
||||
'username': 'usera',
|
||||
'id': 4321,
|
||||
}
|
||||
|
||||
# Cause content response to be None
|
||||
request.content = None
|
||||
assert obj.send(body="test") is True
|
||||
|
||||
# Invalid JSON
|
||||
request.content = '{'
|
||||
assert obj.send(body="test") is True
|
||||
|
||||
# Return it to a parseable string
|
||||
request.content = '{}'
|
||||
|
||||
results = NotifyMastodon.parse_url(
|
||||
'mastodon://{}@{}/@user?visbility=direct'.format(token, host))
|
||||
assert isinstance(results, dict) is True
|
||||
assert '@user' in results['targets']
|
||||
|
||||
# cause a json parsing issue now
|
||||
response_obj = None
|
||||
assert obj.send(body="test") is True
|
||||
|
||||
response_obj = '{'
|
||||
assert obj.send(body="test") is True
|
||||
|
||||
mock_get.reset_mock()
|
||||
mock_post.reset_mock()
|
||||
|
||||
#
|
||||
# Test our lazy lookups
|
||||
#
|
||||
|
||||
# Prepare a good status response
|
||||
request = mock.Mock()
|
||||
request.content = dumps({'id': '1234', 'username': 'caronc'})
|
||||
request.status_code = requests.codes.ok
|
||||
mock_get.return_value = request
|
||||
|
||||
mastodon_url = 'mastodons://key@host?visibility=direct'
|
||||
obj = Apprise.instantiate(mastodon_url)
|
||||
obj._whoami(lazy=True)
|
||||
assert mock_get.call_count == 1
|
||||
assert mock_get.call_args_list[0][0][0] == \
|
||||
'https://host/api/v1/accounts/verify_credentials'
|
||||
|
||||
mock_get.reset_mock()
|
||||
obj._whoami(lazy=True)
|
||||
assert mock_get.call_count == 0
|
||||
|
||||
mock_get.reset_mock()
|
||||
obj._whoami(lazy=False)
|
||||
assert mock_get.call_count == 1
|
||||
assert mock_get.call_args_list[0][0][0] == \
|
||||
'https://host/api/v1/accounts/verify_credentials'
|
||||
|
||||
|
||||
@mock.patch('requests.post')
|
||||
@mock.patch('requests.get')
|
||||
def test_plugin_mastodon_attachments(mock_get, mock_post):
|
||||
"""
|
||||
NotifyMastodon() Toot Attachment Checks
|
||||
|
||||
"""
|
||||
akey = 'access_key'
|
||||
host = 'nuxref.com'
|
||||
username = 'caronc'
|
||||
|
||||
# Prepare a good status response
|
||||
good_response_obj = {
|
||||
'id': '1234',
|
||||
}
|
||||
|
||||
good_response = mock.Mock()
|
||||
good_response.content = dumps(good_response_obj)
|
||||
good_response.status_code = requests.codes.ok
|
||||
|
||||
# Prepare a good whoami response
|
||||
good_whoami_response_obj = {
|
||||
'username': username,
|
||||
'id': '9876',
|
||||
}
|
||||
|
||||
good_whoami_response = mock.Mock()
|
||||
good_whoami_response.content = dumps(good_whoami_response_obj)
|
||||
good_whoami_response.status_code = requests.codes.ok
|
||||
|
||||
# Prepare bad response
|
||||
bad_response = mock.Mock()
|
||||
bad_response.content = dumps({})
|
||||
bad_response.status_code = requests.codes.internal_server_error
|
||||
|
||||
# Prepare a good media response
|
||||
good_media_response = mock.Mock()
|
||||
good_media_response.content = dumps({
|
||||
"id": '710511363345354753',
|
||||
"file_mime": "image/jpeg",
|
||||
})
|
||||
good_media_response.status_code = requests.codes.ok
|
||||
|
||||
#
|
||||
# Start testing using fixtures above
|
||||
#
|
||||
mock_post.side_effect = [good_media_response, good_response]
|
||||
mock_get.return_value = good_whoami_response
|
||||
|
||||
mastodon_url = 'mastodon://{}@{}'.format(akey, host)
|
||||
|
||||
# attach our content
|
||||
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
|
||||
|
||||
# instantiate our object
|
||||
obj = Apprise.instantiate(mastodon_url)
|
||||
|
||||
# Send our notification
|
||||
assert obj.notify(
|
||||
body='body', title='title', notify_type=NotifyType.INFO,
|
||||
attach=attach) is True
|
||||
|
||||
# Test our call count
|
||||
assert mock_get.call_count == 0
|
||||
assert mock_post.call_count == 2
|
||||
assert mock_post.call_args_list[0][0][0] == \
|
||||
'http://nuxref.com/api/v1/media'
|
||||
assert mock_post.call_args_list[1][0][0] == \
|
||||
'http://nuxref.com/api/v1/statuses'
|
||||
|
||||
# Test our media payload
|
||||
assert 'files' in mock_post.call_args_list[0][1]
|
||||
assert 'file' in mock_post.call_args_list[0][1]['files']
|
||||
assert mock_post.call_args_list[0][1]['files']['file'][0] \
|
||||
== 'apprise-test.gif'
|
||||
|
||||
# Test our status payload
|
||||
payload = loads(mock_post.call_args_list[1][1]['data'])
|
||||
assert 'status' in payload
|
||||
assert payload['status'] == 'title\r\nbody'
|
||||
assert 'sensitive' in payload
|
||||
assert payload['sensitive'] is False
|
||||
assert 'media_ids' in payload
|
||||
assert isinstance(payload['media_ids'], list)
|
||||
assert len(payload['media_ids']) == 1
|
||||
assert payload['media_ids'][0] == '710511363345354753'
|
||||
|
||||
# Verify we don't set incorrect keys not otherwise specified
|
||||
assert 'spoiler_text' not in payload
|
||||
|
||||
mock_get.reset_mock()
|
||||
mock_post.reset_mock()
|
||||
|
||||
#
|
||||
# Handle the query again, but this time perform a direct message
|
||||
# requiring us to look up who we are
|
||||
#
|
||||
mock_post.side_effect = [good_media_response, good_response]
|
||||
mock_get.return_value = good_whoami_response
|
||||
|
||||
mastodon_url = 'mastodon://{}@{}?visibility=direct'.format(akey, host)
|
||||
|
||||
# attach our content
|
||||
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
|
||||
|
||||
# instantiate our object
|
||||
obj = Apprise.instantiate(mastodon_url)
|
||||
|
||||
# Send our notification
|
||||
assert obj.notify(
|
||||
body='body', title='title', notify_type=NotifyType.INFO,
|
||||
attach=attach) is True
|
||||
|
||||
# Test our call count
|
||||
assert mock_get.call_count == 1
|
||||
assert mock_post.call_count == 2
|
||||
assert mock_get.call_args_list[0][0][0] == \
|
||||
'http://nuxref.com/api/v1/accounts/verify_credentials'
|
||||
assert mock_post.call_args_list[0][0][0] == \
|
||||
'http://nuxref.com/api/v1/media'
|
||||
assert mock_post.call_args_list[1][0][0] == \
|
||||
'http://nuxref.com/api/v1/statuses'
|
||||
|
||||
# Test our status payload
|
||||
payload = loads(mock_post.call_args_list[1][1]['data'])
|
||||
assert 'status' in payload
|
||||
# Our ID was added into the payload
|
||||
assert payload['status'] == '@caronc title\r\nbody'
|
||||
assert 'sensitive' in payload
|
||||
assert payload['sensitive'] is False
|
||||
assert 'media_ids' in payload
|
||||
assert isinstance(payload['media_ids'], list)
|
||||
assert len(payload['media_ids']) == 1
|
||||
assert payload['media_ids'][0] == '710511363345354753'
|
||||
|
||||
mock_get.reset_mock()
|
||||
mock_post.reset_mock()
|
||||
|
||||
# Store 3 attachments
|
||||
attach = (
|
||||
AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif')),
|
||||
AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.png')),
|
||||
AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.jpeg')),
|
||||
)
|
||||
|
||||
# Prepare a good media response
|
||||
mr1 = mock.Mock()
|
||||
mr1.content = dumps({
|
||||
"id": '1',
|
||||
"file_mime": "image/gif",
|
||||
})
|
||||
mr1.status_code = requests.codes.ok
|
||||
|
||||
mr2 = mock.Mock()
|
||||
mr2.content = dumps({
|
||||
"id": '2',
|
||||
"file_mime": "image/png",
|
||||
})
|
||||
mr2.status_code = requests.codes.ok
|
||||
|
||||
mr3 = mock.Mock()
|
||||
mr3.content = dumps({
|
||||
"id": '3',
|
||||
"file_mime": "image/jpeg",
|
||||
})
|
||||
mr3.status_code = requests.codes.ok
|
||||
|
||||
# Return 3 good uploads and a good status response
|
||||
mock_post.side_effect = [mr1, mr2, mr3, good_response, good_response]
|
||||
mock_get.return_value = good_whoami_response
|
||||
|
||||
# instantiate our object
|
||||
mastodon_url = 'mastodons://{}@{}' \
|
||||
'?visibility=direct&sensitive=yes&key=abcd'.format(akey, host)
|
||||
obj = Apprise.instantiate(mastodon_url)
|
||||
|
||||
# Send ourselves a direct message where our ID was already found
|
||||
# in the body. This smart detection method will prevent us from
|
||||
# adding the @caronc to the begining of the same message (since it's a
|
||||
# direct message)
|
||||
assert obj.notify(
|
||||
body='Check this out @caronc', title='Apprise',
|
||||
notify_type=NotifyType.INFO,
|
||||
attach=attach) is True
|
||||
|
||||
# Test our call count
|
||||
assert mock_get.call_count == 1
|
||||
assert mock_post.call_count == 5
|
||||
assert mock_post.call_args_list[0][0][0] == \
|
||||
'https://nuxref.com/api/v1/media'
|
||||
assert mock_post.call_args_list[1][0][0] == \
|
||||
'https://nuxref.com/api/v1/media'
|
||||
assert mock_post.call_args_list[2][0][0] == \
|
||||
'https://nuxref.com/api/v1/media'
|
||||
# Our status's will batch up and send the last 2 images in one
|
||||
# and our animated gif in one.
|
||||
assert mock_post.call_args_list[3][0][0] == \
|
||||
'https://nuxref.com/api/v1/statuses'
|
||||
assert mock_post.call_args_list[4][0][0] == \
|
||||
'https://nuxref.com/api/v1/statuses'
|
||||
assert mock_get.call_args_list[0][0][0] == \
|
||||
'https://nuxref.com/api/v1/accounts/verify_credentials'
|
||||
|
||||
# Test our status payload
|
||||
payload = loads(mock_post.call_args_list[3][1]['data'])
|
||||
assert 'status' in payload
|
||||
assert payload['status'] == 'Apprise\r\nCheck this out @caronc'
|
||||
assert 'sensitive' in payload
|
||||
assert payload['sensitive'] is True
|
||||
assert 'language' not in payload
|
||||
assert 'Idempotency-Key' in payload
|
||||
assert payload['Idempotency-Key'] == 'abcd'
|
||||
assert 'media_ids' in payload
|
||||
assert isinstance(payload['media_ids'], list)
|
||||
assert len(payload['media_ids']) == 1
|
||||
assert payload['media_ids'][0] == '1'
|
||||
|
||||
payload = loads(mock_post.call_args_list[4][1]['data'])
|
||||
assert 'status' in payload
|
||||
assert payload['status'] == '02/02'
|
||||
assert 'sensitive' in payload
|
||||
assert payload['sensitive'] is False
|
||||
assert 'language' not in payload
|
||||
assert 'Idempotency-Key' in payload
|
||||
assert payload['Idempotency-Key'] == 'abcd-part01'
|
||||
assert 'media_ids' in payload
|
||||
assert isinstance(payload['media_ids'], list)
|
||||
assert len(payload['media_ids']) == 2
|
||||
assert '2' in payload['media_ids']
|
||||
assert '3' in payload['media_ids']
|
||||
|
||||
# A second call does not cause us to look up our ID as we already know it
|
||||
mock_get.reset_mock()
|
||||
mock_post.reset_mock()
|
||||
mock_post.side_effect = [mr1, mr2, mr3, good_response, good_response]
|
||||
mock_get.return_value = good_whoami_response
|
||||
assert obj.notify(
|
||||
body='Check this out @caronc', title='Apprise',
|
||||
notify_type=NotifyType.INFO,
|
||||
attach=attach) is True
|
||||
|
||||
# Same number of posts
|
||||
assert mock_post.call_count == 5
|
||||
# But no lookup was made
|
||||
assert mock_get.call_count == 0
|
||||
|
||||
mock_get.reset_mock()
|
||||
mock_post.reset_mock()
|
||||
|
||||
# Prepare an attach list
|
||||
attach = (
|
||||
os.path.join(TEST_VAR_DIR, 'apprise-test.png'),
|
||||
os.path.join(TEST_VAR_DIR, 'apprise-test.jpeg'),
|
||||
)
|
||||
|
||||
mock_post.side_effect = [mr2, mr3, good_response, good_response]
|
||||
mock_get.return_value = good_whoami_response
|
||||
|
||||
# instantiate our object (but turn off the batch mode)
|
||||
mastodon_url = 'mastodons://{}@{}?batch=no'.format(
|
||||
akey, host)
|
||||
obj = Apprise.instantiate(mastodon_url)
|
||||
|
||||
assert obj.notify(
|
||||
body='Check this out @caronc', title='Apprise',
|
||||
notify_type=NotifyType.INFO,
|
||||
attach=attach) is True
|
||||
|
||||
# 2 attachments + 2 different status messages
|
||||
assert mock_post.call_count == 4
|
||||
|
||||
# But no lookup was made
|
||||
assert mock_get.call_count == 0
|
||||
|
||||
assert mock_post.call_args_list[0][0][0] == \
|
||||
'https://nuxref.com/api/v1/media'
|
||||
assert mock_post.call_args_list[1][0][0] == \
|
||||
'https://nuxref.com/api/v1/media'
|
||||
assert mock_post.call_args_list[2][0][0] == \
|
||||
'https://nuxref.com/api/v1/statuses'
|
||||
assert mock_post.call_args_list[3][0][0] == \
|
||||
'https://nuxref.com/api/v1/statuses'
|
||||
|
||||
mock_get.reset_mock()
|
||||
mock_post.reset_mock()
|
||||
|
||||
# Prepare a bad media response
|
||||
bad_response = mock.Mock()
|
||||
bad_response.status_code = requests.codes.internal_server_error
|
||||
|
||||
bad_responses = (
|
||||
dumps({"error": "authorized scopes"}),
|
||||
'',
|
||||
)
|
||||
|
||||
#
|
||||
# Test our Media failures
|
||||
#
|
||||
|
||||
# Try several bad responses so we can capture the block of code where
|
||||
# we try to help the end user to remind them what scope they're missing
|
||||
for response in bad_responses:
|
||||
mock_post.side_effect = [good_media_response, bad_response]
|
||||
bad_response.content = response
|
||||
|
||||
# instantiate our object
|
||||
mastodon_url = \
|
||||
'mastodons://{}@{}?visibility=public&spoiler=uhoh'.format(
|
||||
akey, host)
|
||||
obj = Apprise.instantiate(mastodon_url)
|
||||
|
||||
# Our notification will fail now since our toot will error out
|
||||
# This is the same test as above, except our error response isn't
|
||||
# parseable
|
||||
assert obj.notify(
|
||||
body='body', title='title', notify_type=NotifyType.INFO,
|
||||
attach=attach) is False
|
||||
|
||||
# Test our call count
|
||||
assert mock_get.call_count == 0
|
||||
assert mock_post.call_count == 2
|
||||
assert mock_post.call_args_list[0][0][0] == \
|
||||
'https://nuxref.com/api/v1/media'
|
||||
assert mock_post.call_args_list[1][0][0] == \
|
||||
'https://nuxref.com/api/v1/media'
|
||||
|
||||
mock_get.reset_mock()
|
||||
mock_post.reset_mock()
|
||||
|
||||
#
|
||||
# Test our Status failures
|
||||
#
|
||||
|
||||
# Try several bad responses so we can capture the block of code where
|
||||
# we try to help the end user to remind them what scope they're missing
|
||||
for response in bad_responses:
|
||||
mock_post.side_effect = [bad_response]
|
||||
bad_response.content = response
|
||||
|
||||
# instantiate our object
|
||||
mastodon_url = 'mastodons://{}@{}'.format(akey, host)
|
||||
obj = Apprise.instantiate(mastodon_url)
|
||||
|
||||
# Our notification will fail now since our toot will error out
|
||||
# This is the same test as above, except our error response isn't
|
||||
# parseable
|
||||
assert obj.notify(
|
||||
body='body', title='title', notify_type=NotifyType.INFO) is False
|
||||
|
||||
# Test our call count
|
||||
assert mock_get.call_count == 0
|
||||
assert mock_post.call_count == 1
|
||||
assert mock_post.call_args_list[0][0][0] == \
|
||||
'https://nuxref.com/api/v1/statuses'
|
||||
|
||||
mock_get.reset_mock()
|
||||
mock_post.reset_mock()
|
||||
|
||||
#
|
||||
# Test our whoami failures
|
||||
#
|
||||
|
||||
# Try several bad responses so we can capture the block of code where
|
||||
# we try to help the end user to remind them what scope they're missing
|
||||
for response in bad_responses:
|
||||
mock_get.side_effect = [bad_response]
|
||||
bad_response.content = response
|
||||
|
||||
# instantiate our object
|
||||
mastodon_url = 'mastodons://{}@{}?visibility=direct'.format(akey, host)
|
||||
obj = Apprise.instantiate(mastodon_url)
|
||||
|
||||
# Our notification will fail now since our toot will error out
|
||||
# This is the same test as above, except our error response isn't
|
||||
# parseable
|
||||
assert obj.notify(
|
||||
body='body', title='title', notify_type=NotifyType.INFO) is False
|
||||
|
||||
# Test our call count
|
||||
assert mock_get.call_count == 1
|
||||
assert mock_post.call_count == 0
|
||||
assert mock_get.call_args_list[0][0][0] == \
|
||||
'https://nuxref.com/api/v1/accounts/verify_credentials'
|
||||
|
||||
mock_get.reset_mock()
|
||||
mock_post.reset_mock()
|
||||
|
||||
mock_post.side_effect = [mr1, mr2, mr3, good_response, good_response]
|
||||
mock_get.return_value = None
|
||||
|
||||
# instantiate our object
|
||||
mastodon_url = 'mastodons://{}@{}'.format(akey, host)
|
||||
obj = Apprise.instantiate(mastodon_url)
|
||||
|
||||
# An invalid attachment will cause a failure
|
||||
path = os.path.join(TEST_VAR_DIR, '/invalid/path/to/an/invalid/file.jpg')
|
||||
assert obj.notify(
|
||||
body='body', title='title', notify_type=NotifyType.INFO,
|
||||
attach=path) is False
|
||||
|
||||
# No get requests are made
|
||||
assert mock_get.call_count == 0
|
||||
|
||||
# No post request as attachment is no good anyway
|
||||
assert mock_post.call_count == 0
|
||||
|
||||
mock_get.reset_mock()
|
||||
mock_post.reset_mock()
|
||||
|
||||
# We have an OSError thrown in the middle of our preparation
|
||||
mock_post.side_effect = [
|
||||
good_media_response, OSError(), good_media_response]
|
||||
mock_get.return_value = good_response
|
||||
|
||||
# 3 images are produced
|
||||
attach = [
|
||||
os.path.join(TEST_VAR_DIR, 'apprise-test.gif'),
|
||||
# This one is not supported, so it's ignored gracefully
|
||||
os.path.join(TEST_VAR_DIR, 'apprise-archive.zip'),
|
||||
# A supported video file
|
||||
os.path.join(TEST_VAR_DIR, 'apprise-test.mp4'),
|
||||
]
|
||||
|
||||
# We'll fail to send this time
|
||||
assert obj.notify(
|
||||
body='body', title='title', notify_type=NotifyType.INFO,
|
||||
attach=attach) is False
|
||||
|
||||
assert mock_get.call_count == 0
|
||||
# No get request as cached response is used
|
||||
assert mock_post.call_count == 2
|
||||
assert mock_post.call_args_list[0][0][0] == \
|
||||
'https://nuxref.com/api/v1/media'
|
||||
assert mock_post.call_args_list[1][0][0] == \
|
||||
'https://nuxref.com/api/v1/media'
|
BIN
test/var/apprise-archive.zip
Normal file
BIN
test/var/apprise-archive.zip
Normal file
Binary file not shown.
Loading…
Reference in New Issue
Block a user