Revolt Plugin Refactored (#1062)

This commit is contained in:
Chris Caron 2024-02-17 19:25:17 -05:00 committed by GitHub
parent 67645909a3
commit 119b4f06e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 269 additions and 198 deletions

View File

@ -108,7 +108,7 @@ The table below identifies the services this tool supports and some example serv
| [Pushy](https://github.com/caronc/apprise/wiki/Notify_pushy) | pushy:// | (TCP) 443 | pushy://apikey/DEVICE<br />pushy://apikey/DEVICE1/DEVICE2/DEVICEN<br />pushy://apikey/TOPIC<br />pushy://apikey/TOPIC1/TOPIC2/TOPICN | [Pushy](https://github.com/caronc/apprise/wiki/Notify_pushy) | pushy:// | (TCP) 443 | pushy://apikey/DEVICE<br />pushy://apikey/DEVICE1/DEVICE2/DEVICEN<br />pushy://apikey/TOPIC<br />pushy://apikey/TOPIC1/TOPIC2/TOPICN
| [PushDeer](https://github.com/caronc/apprise/wiki/Notify_pushdeer) | pushdeer:// or pushdeers:// | (TCP) 80 or 443 | pushdeer://pushKey<br />pushdeer://hostname/pushKey<br />pushdeer://hostname:port/pushKey | [PushDeer](https://github.com/caronc/apprise/wiki/Notify_pushdeer) | pushdeer:// or pushdeers:// | (TCP) 80 or 443 | pushdeer://pushKey<br />pushdeer://hostname/pushKey<br />pushdeer://hostname:port/pushKey
| [Reddit](https://github.com/caronc/apprise/wiki/Notify_reddit) | reddit:// | (TCP) 443 | reddit://user:password@app_id/app_secret/subreddit<br />reddit://user:password@app_id/app_secret/sub1/sub2/subN | [Reddit](https://github.com/caronc/apprise/wiki/Notify_reddit) | reddit:// | (TCP) 443 | reddit://user:password@app_id/app_secret/subreddit<br />reddit://user:password@app_id/app_secret/sub1/sub2/subN
| [Revolt](https://github.com/caronc/apprise/wiki/Notify_Revolt) | revolt:// | (TCP) 443 | revolt://bot_token/channel_id | | [Revolt](https://github.com/caronc/apprise/wiki/Notify_Revolt) | revolt:// | (TCP) 443 | revolt://bottoken/ChannelID<br />revolt://bottoken/ChannelID1/ChannelID2/ChannelIDN |
| [Rocket.Chat](https://github.com/caronc/apprise/wiki/Notify_rocketchat) | rocket:// or rockets:// | (TCP) 80 or 443 | rocket://user:password@hostname/RoomID/Channel<br />rockets://user:password@hostname:443/#Channel1/#Channel1/RoomID<br />rocket://user:password@hostname/#Channel<br />rocket://webhook@hostname<br />rockets://webhook@hostname/@User/#Channel | [Rocket.Chat](https://github.com/caronc/apprise/wiki/Notify_rocketchat) | rocket:// or rockets:// | (TCP) 80 or 443 | rocket://user:password@hostname/RoomID/Channel<br />rockets://user:password@hostname:443/#Channel1/#Channel1/RoomID<br />rocket://user:password@hostname/#Channel<br />rocket://webhook@hostname<br />rockets://webhook@hostname/@User/#Channel
| [RSyslog](https://github.com/caronc/apprise/wiki/Notify_rsyslog) | rsyslog:// | (UDP) 514 | rsyslog://hostname<br />rsyslog://hostname/Facility | [RSyslog](https://github.com/caronc/apprise/wiki/Notify_rsyslog) | rsyslog:// | (UDP) 514 | rsyslog://hostname<br />rsyslog://hostname/Facility
| [Ryver](https://github.com/caronc/apprise/wiki/Notify_ryver) | ryver:// | (TCP) 443 | ryver://Organization/Token<br />ryver://botname@Organization/Token | [Ryver](https://github.com/caronc/apprise/wiki/Notify_ryver) | ryver:// | (TCP) 443 | ryver://Organization/Token<br />ryver://botname@Organization/Token

View File

@ -28,7 +28,7 @@
import re import re
from .logger import logger from .logger import logger
from time import sleep import time
from datetime import datetime from datetime import datetime
from xml.sax.saxutils import escape as sax_escape from xml.sax.saxutils import escape as sax_escape
@ -298,12 +298,12 @@ class URLBase:
if wait is not None: if wait is not None:
self.logger.debug('Throttling forced for {}s...'.format(wait)) self.logger.debug('Throttling forced for {}s...'.format(wait))
sleep(wait) time.sleep(wait)
elif elapsed < self.request_rate_per_sec: elif elapsed < self.request_rate_per_sec:
self.logger.debug('Throttling for {}s...'.format( self.logger.debug('Throttling for {}s...'.format(
self.request_rate_per_sec - elapsed)) self.request_rate_per_sec - elapsed))
sleep(self.request_rate_per_sec - elapsed) time.sleep(self.request_rate_per_sec - elapsed)
# Update our timestamp before we leave # Update our timestamp before we leave
self._last_io_datetime = datetime.now() self._last_io_datetime = datetime.now()

View File

@ -37,7 +37,7 @@
# #
import requests import requests
from json import dumps from json import dumps, loads
from datetime import timedelta from datetime import timedelta
from datetime import datetime from datetime import datetime
from datetime import timezone from datetime import timezone
@ -46,8 +46,8 @@ from .NotifyBase import NotifyBase
from ..common import NotifyImageSize from ..common import NotifyImageSize
from ..common import NotifyFormat from ..common import NotifyFormat
from ..common import NotifyType from ..common import NotifyType
from ..utils import parse_bool
from ..utils import validate_regex from ..utils import validate_regex
from ..utils import parse_list
from ..AppriseLocale import gettext_lazy as _ from ..AppriseLocale import gettext_lazy as _
@ -60,7 +60,7 @@ class NotifyRevolt(NotifyBase):
service_name = 'Revolt' service_name = 'Revolt'
# The services URL # The services URL
service_url = 'https://api.revolt.chat/' service_url = 'https://revolt.chat/'
# The default secure protocol # The default secure protocol
secure_protocol = 'revolt' secure_protocol = 'revolt'
@ -71,7 +71,7 @@ class NotifyRevolt(NotifyBase):
# Revolt Channel Message # Revolt Channel Message
notify_url = 'https://api.revolt.chat/' notify_url = 'https://api.revolt.chat/'
# Revolt supports attachments but don't implemenet for now # Revolt supports attachments but doesn't support it here (for now)
attachment_support = False attachment_support = False
# Allows the user to specify the NotifyImageSize object # Allows the user to specify the NotifyImageSize object
@ -85,8 +85,8 @@ class NotifyRevolt(NotifyBase):
# still allow to make. # still allow to make.
request_rate_per_sec = 3 request_rate_per_sec = 3
# Taken right from google.auth.helpers: # Safety net
clock_skew = timedelta(seconds=10) clock_skew = timedelta(seconds=2)
# The maximum allowable characters allowed in the body per message # The maximum allowable characters allowed in the body per message
body_maxlen = 2000 body_maxlen = 2000
@ -96,7 +96,7 @@ class NotifyRevolt(NotifyBase):
# Define object templates # Define object templates
templates = ( templates = (
'{schema}://{bot_token}/{channel_id}', '{schema}://{bot_token}/{targets}',
) )
# Defile out template tokens # Defile out template tokens
@ -107,39 +107,44 @@ class NotifyRevolt(NotifyBase):
'private': True, 'private': True,
'required': True, 'required': True,
}, },
'channel_id': { 'target_channel': {
'name': _('Channel Id'), 'name': _('Channel ID'),
'type': 'string', 'type': 'string',
'map_to': 'targets',
'regex': (r'^[a-z0-9_-]+$', 'i'),
'private': True, 'private': True,
'required': True, 'required': True,
}, },
'targets': {
'name': _('Targets'),
'type': 'list:string',
},
}) })
# Define our template arguments # Define our template arguments
template_args = dict(NotifyBase.template_args, **{ template_args = dict(NotifyBase.template_args, **{
'channel_id': { 'channel': {
'alias_of': 'channel_id', 'alias_of': 'targets',
}, },
'bot_token': { 'bot_token': {
'alias_of': 'bot_token', 'alias_of': 'bot_token',
}, },
'embed_img': { 'icon_url': {
'name': _('Embed Image Url'), 'name': _('Icon URL'),
'type': 'string' 'type': 'string'
}, },
'embed_url': { 'url': {
'name': _('Embed Url'), 'name': _('Embed URL'),
'type': 'string' 'type': 'string',
'map_to': 'link',
},
'to': {
'alias_of': 'targets',
}, },
'custom_img': {
'name': _('Custom Embed Url'),
'type': 'bool',
'default': False
}
}) })
def __init__(self, bot_token, channel_id, embed_img=None, embed_url=None, def __init__(self, bot_token, targets, icon_url=None, link=None,
custom_img=None, **kwargs): **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
# Bot Token # Bot Token
@ -150,24 +155,27 @@ class NotifyRevolt(NotifyBase):
self.logger.warning(msg) self.logger.warning(msg)
raise TypeError(msg) raise TypeError(msg)
# Channel Id # Parse our Channel IDs
self.channel_id = validate_regex(channel_id) self.targets = []
if not self.channel_id: for target in parse_list(targets):
msg = 'An invalid Revolt Channel Id' \ results = validate_regex(
'({}) was specified.'.format(channel_id) target, *self.template_tokens['target_channel']['regex'])
self.logger.warning(msg)
raise TypeError(msg)
# Use custom image for embed image if not results:
self.custom_img = parse_bool(custom_img) \ self.logger.warning(
if custom_img is not None \ 'Dropped invalid Revolt channel ({}) specified.'
else self.template_args['custom_img']['default'] .format(target),
)
continue
# Add our target
self.targets.append(target)
# Image for Embed # Image for Embed
self.embed_img = embed_img self.icon_url = icon_url
# Url for embed title # Url for embed title
self.embed_url = embed_url self.link = link
# For Tracking Purposes # For Tracking Purposes
self.ratelimit_reset = datetime.now(timezone.utc).replace(tzinfo=None) self.ratelimit_reset = datetime.now(timezone.utc).replace(tzinfo=None)
@ -183,44 +191,47 @@ class NotifyRevolt(NotifyBase):
""" """
if len(self.targets) == 0:
self.logger.warning('There were not Revolt channels to notify.')
return False
payload = {} payload = {}
# Acquire image_url # Acquire image_url
image_url = self.image_url(notify_type) image_url = self.icon_url \
if self.icon_url else self.image_url(notify_type)
if self.custom_img and (image_url or self.embed_url): if self.notify_format == NotifyFormat.MARKDOWN:
image_url = self.embed_url if self.embed_url else image_url payload['embeds'] = [{
'title': None if not title else title[0:self.title_maxlen],
'description': body,
if body: # Our color associated with our notification
if self.notify_format == NotifyFormat.MARKDOWN: 'colour': self.color(notify_type),
if len(title) > 100: 'replies': None
msg = 'Title length must be less than 100 when ' \ }]
'embeds are enabled (is %s)' % len(title)
self.logger.warning(msg)
title = title[0:100]
payload['embeds'] = [{
'title': title,
'description': body,
# Our color associated with our notification if image_url:
'colour': self.color(notify_type, int) payload['embeds'][0]['icon_url'] = image_url
}]
if self.embed_img: if self.link:
payload['embeds'][0]['icon_url'] = image_url payload['embeds'][0]['url'] = self.link
if self.embed_url: else:
payload['embeds'][0]['url'] = self.embed_url payload['content'] = \
else: body if not title else "{}\n{}".format(title, body)
payload['content'] = \
body if not title else "{}\n{}".format(title, body)
if not self._send(payload): has_error = False
channel_ids = list(self.targets)
for channel_id in channel_ids:
postokay, response = self._send(payload, channel_id)
if not postokay:
# Failed to send message # Failed to send message
return False has_error = True
return True
def _send(self, payload, rate_limit=1, **kwargs): return not has_error
def _send(self, payload, channel_id, retries=1, **kwargs):
""" """
Wrapper to the requests (post) object Wrapper to the requests (post) object
@ -229,12 +240,13 @@ class NotifyRevolt(NotifyBase):
headers = { headers = {
'User-Agent': self.app_id, 'User-Agent': self.app_id,
'X-Bot-Token': self.bot_token, 'X-Bot-Token': self.bot_token,
'Content-Type': 'application/json; charset=utf-8' 'Content-Type': 'application/json; charset=utf-8',
'Accept': 'application/json; charset=utf-8',
} }
notify_url = '{0}channels/{1}/messages'.format( notify_url = '{0}channels/{1}/messages'.format(
self.notify_url, self.notify_url,
self.channel_id channel_id
) )
self.logger.debug('Revolt POST URL: %s (cert_verify=%r)' % ( self.logger.debug('Revolt POST URL: %s (cert_verify=%r)' % (
@ -245,20 +257,22 @@ class NotifyRevolt(NotifyBase):
# By default set wait to None # By default set wait to None
wait = None wait = None
now = datetime.now(timezone.utc).replace(tzinfo=None)
if self.ratelimit_remaining <= 0.0: if self.ratelimit_remaining <= 0.0:
# Determine how long we should wait for or if we should wait at # Determine how long we should wait for or if we should wait at
# all. This isn't fool-proof because we can't be sure the client # all. This isn't fool-proof because we can't be sure the client
# time (calling this script) is completely synced up with the # time (calling this script) is completely synced up with the
# Discord server. One would hope we're on NTP and our clocks are # Discord server. One would hope we're on NTP and our clocks are
# the same allowing this to role smoothly: # the same allowing this to role smoothly:
now = datetime.now(timezone.utc).replace(tzinfo=None)
if now < self.ratelimit_reset: if now < self.ratelimit_reset:
# We need to throttle for the difference in seconds # We need to throttle for the difference in seconds
wait = abs( wait = abs(
(self.ratelimit_reset - now + self.clock_skew) (self.ratelimit_reset - now + self.clock_skew)
.total_seconds()) .total_seconds())
# Default content response object
content = {}
# Always call throttle before any remote server i/o is made; # Always call throttle before any remote server i/o is made;
self.throttle(wait=wait) self.throttle(wait=wait)
@ -271,15 +285,23 @@ class NotifyRevolt(NotifyBase):
timeout=self.request_timeout timeout=self.request_timeout
) )
try:
content = loads(r.content)
except (AttributeError, TypeError, ValueError):
# ValueError = r.content is Unparsable
# TypeError = r.content is None
# AttributeError = r is None
content = {}
# Handle rate limiting (if specified) # Handle rate limiting (if specified)
try: try:
# Store our rate limiting (if provided) # Store our rate limiting (if provided)
self.ratelimit_remaining = \ self.ratelimit_remaining = \
float(r.headers.get( int(r.headers.get('X-RateLimit-Remaining'))
'X-RateLimit-Remaining')) self.ratelimit_reset = \
self.ratelimit_reset = datetime.fromtimestamp( now + timedelta(seconds=(int(
int(r.headers.get('X-RateLimit-Reset')), r.headers.get('X-RateLimit-Reset-After')) / 1000))
timezone.utc).replace(tzinfo=None)
except (TypeError, ValueError): except (TypeError, ValueError):
# This is returned if we could not retrieve this # This is returned if we could not retrieve this
@ -289,23 +311,27 @@ class NotifyRevolt(NotifyBase):
if r.status_code not in ( if r.status_code not in (
requests.codes.ok, requests.codes.no_content): requests.codes.ok, requests.codes.no_content):
# Some details to debug by
self.logger.debug('Response Details:\r\n{}'.format(
content if content else r.content))
# We had a problem # We had a problem
status_str = \ status_str = \
NotifyBase.http_response_code_lookup(r.status_code) NotifyBase.http_response_code_lookup(r.status_code)
self.logger.warning(
'Revolt request limit reached; '
'instructed to throttle for %.3fs',
abs((self.ratelimit_reset - now + self.clock_skew)
.total_seconds()))
if r.status_code == requests.codes.too_many_requests \ if r.status_code == requests.codes.too_many_requests \
and rate_limit > 0: and retries > 0:
# handle rate limiting # Try again
self.logger.warning(
'Revolt rate limiting in effect; '
'blocking for %.2f second(s)',
self.ratelimit_remaining)
# Try one more time before failing
return self._send( return self._send(
payload=payload, payload=payload, channel_id=channel_id,
rate_limit=rate_limit - 1, **kwargs) retries=retries - 1, **kwargs)
self.logger.warning( self.logger.warning(
'Failed to send to Revolt notification: ' 'Failed to send to Revolt notification: '
@ -314,10 +340,8 @@ class NotifyRevolt(NotifyBase):
', ' if status_str else '', ', ' if status_str else '',
r.status_code)) r.status_code))
self.logger.debug('Response Details:\r\n{}'.format(r.content))
# Return; we're done # Return; we're done
return False return (False, content)
else: else:
self.logger.info('Sent Revolt notification.') self.logger.info('Sent Revolt notification.')
@ -326,9 +350,9 @@ class NotifyRevolt(NotifyBase):
self.logger.warning( self.logger.warning(
'A Connection error occurred posting to Revolt.') 'A Connection error occurred posting to Revolt.')
self.logger.debug('Socket Exception: %s' % str(e)) self.logger.debug('Socket Exception: %s' % str(e))
return False return (False, content)
return True return (True, content)
def url(self, privacy=False, *args, **kwargs): def url(self, privacy=False, *args, **kwargs):
""" """
@ -336,32 +360,37 @@ class NotifyRevolt(NotifyBase):
""" """
# Define any URL parameters
params = {} params = {}
if self.embed_img: if self.icon_url:
params['embed_img'] = self.embed_img params['icon_url'] = self.icon_url
if self.embed_url: if self.link:
params['embed_url'] = self.embed_url params['url'] = self.link
params.update(self.url_parameters(privacy=privacy, *args, **kwargs)) params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
return '{schema}://{bot_token}/{channel_id}/?{params}'.format( return '{schema}://{bot_token}/{targets}/?{params}'.format(
schema=self.secure_protocol, schema=self.secure_protocol,
bot_token=self.pprint(self.bot_token, privacy, safe=''), bot_token=self.pprint(self.bot_token, privacy, safe=''),
channel_id=self.pprint(self.channel_id, privacy, safe=''), targets='/'.join(
[self.pprint(x, privacy, safe='') for x in self.targets]),
params=NotifyRevolt.urlencode(params), params=NotifyRevolt.urlencode(params),
) )
def __len__(self):
"""
Returns the number of targets associated with this notification
"""
return 1 if not self.targets else len(self.targets)
@staticmethod @staticmethod
def parse_url(url): def parse_url(url):
""" """
Parses the URL and returns enough arguments that can allow Parses the URL and returns enough arguments that can allow
us to re-instantiate this object. us to re-instantiate this object.
Syntax:
revolt://bot_token/channel_id
""" """
results = NotifyBase.parse_url(url, verify_host=False) results = NotifyBase.parse_url(url, verify_host=False)
if not results: if not results:
@ -371,44 +400,37 @@ class NotifyRevolt(NotifyBase):
# Store our bot token # Store our bot token
bot_token = NotifyRevolt.unquote(results['host']) bot_token = NotifyRevolt.unquote(results['host'])
# Now fetch the channel id # Now fetch the Channel IDs
try: targets = NotifyRevolt.split_path(results['fullpath'])
channel_id = \
NotifyRevolt.split_path(results['fullpath'])[0]
except IndexError:
# Force some bad values that will get caught
# in parsing later
channel_id = None
results['bot_token'] = bot_token results['bot_token'] = bot_token
results['channel_id'] = channel_id results['targets'] = targets
# Text To Speech # Support the 'to' variable so that we can support rooms this way too
results['tts'] = parse_bool(results['qsd'].get('tts', False)) # The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyRevolt.parse_list(results['qsd']['to'])
# Support channel id on the URL string (if specified) # Support channel id on the URL string (if specified)
if 'channel_id' in results['qsd']: if 'channel' in results['qsd']:
results['channel_id'] = \ results['targets'] += \
NotifyRevolt.unquote(results['qsd']['channel_id']) NotifyRevolt.parse_list(results['qsd']['channel'])
# Support bot token on the URL string (if specified) # Support bot token on the URL string (if specified)
if 'bot_token' in results['qsd']: if 'bot_token' in results['qsd']:
results['bot_token'] = \ results['bot_token'] = \
NotifyRevolt.unquote(results['qsd']['bot_token']) NotifyRevolt.unquote(results['qsd']['bot_token'])
# Extract avatar url if it was specified if 'icon_url' in results['qsd']:
if 'embed_img' in results['qsd']: results['icon_url'] = \
results['embed_img'] = \ NotifyRevolt.unquote(results['qsd']['icon_url'])
NotifyRevolt.unquote(results['qsd']['embed_img'])
if 'custom_img' in results['qsd']: if 'url' in results['qsd']:
results['custom_img'] = \ results['link'] = NotifyRevolt.unquote(results['qsd']['url'])
NotifyRevolt.unquote(results['qsd']['custom_img'])
elif 'embed_url' in results['qsd']: if 'format' not in results['qsd'] and (
results['embed_url'] = \ 'url' in results or 'icon_url' in results):
NotifyRevolt.unquote(results['qsd']['embed_url'])
# Markdown is implied # Markdown is implied
results['format'] = NotifyFormat.MARKDOWN results['format'] = NotifyFormat.MARKDOWN

View File

@ -29,7 +29,7 @@
import os import os
from unittest import mock from unittest import mock
from datetime import datetime, timedelta from datetime import datetime, timedelta
from datetime import timezone from json import dumps
import pytest import pytest
import requests import requests
@ -51,6 +51,15 @@ logging.disable(logging.CRITICAL)
# Attachment Directory # Attachment Directory
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var') TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
# Prepare a Valid Response
REVOLT_GOOD_RESPONSE = dumps({
'_id': 'AAAPWPMMQA2JJB59BR2EASWWWW',
'nonce': '01HPWPPMDJABC2FTDG54CBKKKS',
'channel': '00000000000000000000000000',
'author': '011244Q9S8NCS67KMM9543W7JJ',
'content': 'test',
})
# Our Testing URLs # Our Testing URLs
apprise_url_tests = ( apprise_url_tests = (
('revolt://', { ('revolt://', {
@ -62,82 +71,128 @@ apprise_url_tests = (
}), }),
# No channel_id specified # No channel_id specified
('revolt://%s' % ('i' * 24), { ('revolt://%s' % ('i' * 24), {
'instance': NotifyRevolt,
# Notify will fail
'response': False,
# Our response expected server response
'requests_response_text': REVOLT_GOOD_RESPONSE,
}),
# channel_id specified on url, but no Bot Token
('revolt://?channel=%s' % ('i' * 24), {
'instance': TypeError, 'instance': TypeError,
}), }),
# channel_id specified on url # channel_id specified on url
('revolt://?channel_id=%s' % ('i' * 24), { ('revolt://%s/?channel=%s' % ('i' * 24, 'i' * 24), {
'instance': TypeError, 'instance': NotifyRevolt,
# Our response expected server response
'requests_response_text': REVOLT_GOOD_RESPONSE,
}),
('revolt://%s/?to=%s' % ('i' * 24, 'i' * 24), {
'instance': NotifyRevolt,
# Our response expected server response
'requests_response_text': REVOLT_GOOD_RESPONSE,
}),
('revolt://%s/?to=%s' % ('i' * 24, 'i' * 24), {
'instance': NotifyRevolt,
# an invalid JSON Response
'requests_response_text': '{',
}),
# channel_id specified on url
('revolt://%s/?channel=%s,%%20' % ('i' * 24, 'i' * 24), {
'instance': NotifyRevolt,
# Our response expected server response
'requests_response_text': REVOLT_GOOD_RESPONSE,
}), }),
# Provide both a bot token and a channel id # Provide both a bot token and a channel id
('revolt://%s/%s' % ('i' * 24, 't' * 64), { ('revolt://%s/%s' % ('i' * 24, 't' * 64), {
'instance': NotifyRevolt, 'instance': NotifyRevolt,
'requests_response_code': requests.codes.no_content, 'requests_response_code': requests.codes.ok,
# Our response expected server response
'requests_response_text': REVOLT_GOOD_RESPONSE,
}), }),
# Provide a temporary username ('revolt://_?bot_token=%s&channel=%s' % ('i' * 24, 't' * 64), {
('revolt://l2g@%s/%s' % ('i' * 24, 't' * 64), {
'instance': NotifyRevolt, 'instance': NotifyRevolt,
'requests_response_code': requests.codes.no_content, 'requests_response_code': requests.codes.ok,
}), # Our response expected server response
('revolt://l2g@_?bot_token=%s&channel_id=%s' % ('i' * 24, 't' * 64), { 'requests_response_text': REVOLT_GOOD_RESPONSE,
'instance': NotifyRevolt,
'requests_response_code': requests.codes.no_content,
}),
# test custom_img= field
('revolt://%s/%s?format=markdown&custom_img=Yes' % (
'i' * 24, 't' * 64), {
'instance': NotifyRevolt,
'requests_response_code': requests.codes.no_content,
}),
('revolt://%s/%s?format=markdown&custom_img=No' % (
'i' * 24, 't' * 64), {
'instance': NotifyRevolt,
'requests_response_code': requests.codes.no_content,
}), }),
# different format support # different format support
('revolt://%s/%s?format=markdown' % ('i' * 24, 't' * 64), { ('revolt://%s/%s?format=markdown' % ('i' * 24, 't' * 64), {
'instance': NotifyRevolt, 'instance': NotifyRevolt,
'requests_response_code': requests.codes.no_content, 'requests_response_code': requests.codes.ok,
# Our response expected server response
'requests_response_text': REVOLT_GOOD_RESPONSE,
}), }),
('revolt://%s/%s?format=text' % ('i' * 24, 't' * 64), { ('revolt://%s/%s?format=text' % ('i' * 24, 't' * 64), {
'instance': NotifyRevolt, 'instance': NotifyRevolt,
'requests_response_code': requests.codes.no_content, 'requests_response_code': requests.codes.ok,
# Our response expected server response
'requests_response_text': REVOLT_GOOD_RESPONSE,
}), }),
# Test with embed_url (title link) # Test with url
('revolt://%s/%s?hmarkdown=true&embed_url=http://localhost' % ( ('revolt://%s/%s?url=http://localhost' % (
'i' * 24, 't' * 64), { 'i' * 24, 't' * 64), {
'instance': NotifyRevolt, 'instance': NotifyRevolt,
'requests_response_code': requests.codes.no_content, 'requests_response_code': requests.codes.ok,
# Our response expected server response
'requests_response_text': REVOLT_GOOD_RESPONSE,
}), }),
# Test with avatar URL # URL implies markdown unless explicitly set otherwise
('revolt://%s/%s?embed_img=http://localhost/test.jpg' % ( ('revolt://%s/%s?format=text&url=http://localhost' % (
'i' * 24, 't' * 64), { 'i' * 24, 't' * 64), {
'instance': NotifyRevolt, 'instance': NotifyRevolt,
'requests_response_code': requests.codes.no_content, 'requests_response_code': requests.codes.ok,
# Our response expected server response
'requests_response_text': REVOLT_GOOD_RESPONSE,
}), }),
# Test without image set # Test with Icon URL
('revolt://%s/%s?icon_url=http://localhost/test.jpg' % (
'i' * 24, 't' * 64), {
'instance': NotifyRevolt,
'requests_response_code': requests.codes.ok,
# Our response expected server response
'requests_response_text': REVOLT_GOOD_RESPONSE,
}),
# Icon URL implies markdown unless explicitly set otherwise
('revolt://%s/%s?format=text&icon_url=http://localhost/test.jpg' % (
'i' * 24, 't' * 64), {
'instance': NotifyRevolt,
'requests_response_code': requests.codes.ok,
# Our response expected server response
'requests_response_text': REVOLT_GOOD_RESPONSE,
}),
# Test without any image set
('revolt://%s/%s' % ('i' * 24, 't' * 64), { ('revolt://%s/%s' % ('i' * 24, 't' * 64), {
'instance': NotifyRevolt, 'instance': NotifyRevolt,
'requests_response_code': requests.codes.no_content, 'requests_response_code': requests.codes.ok,
# don't include an image by default # don't include an image by default
'embed_img': False, 'include_image': False,
# Our response expected server response
'requests_response_text': REVOLT_GOOD_RESPONSE,
}), }),
('revolt://%s/%s/' % ('a' * 24, 'b' * 64), { ('revolt://%s/%s/' % ('a' * 24, 'b' * 64), {
'instance': NotifyRevolt, 'instance': NotifyRevolt,
# force a failure # force a failure
'response': False, 'response': False,
'requests_response_code': requests.codes.internal_server_error, 'requests_response_code': requests.codes.internal_server_error,
# Our response expected server response
'requests_response_text': REVOLT_GOOD_RESPONSE,
}), }),
('revolt://%s/%s/' % ('a' * 24, 'b' * 64), { ('revolt://%s/%s/' % ('a' * 24, 'b' * 64), {
'instance': NotifyRevolt, 'instance': NotifyRevolt,
# throw a bizzare code forcing us to fail to look it up # throw a bizzare code forcing us to fail to look it up
'response': False, 'response': False,
'requests_response_code': 999, 'requests_response_code': 999,
# Our response expected server response
'requests_response_text': REVOLT_GOOD_RESPONSE,
}), }),
('revolt://%s/%s/' % ('a' * 24, 'b' * 64), { ('revolt://%s/%s/' % ('a' * 24, 'b' * 64), {
'instance': NotifyRevolt, 'instance': NotifyRevolt,
# Throws a series of connection and transfer exceptions when this flag # Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them # is set and tests that we gracfully handle them
'test_requests_exceptions': True, 'test_requests_exceptions': True,
# Our response expected server response
'requests_response_text': REVOLT_GOOD_RESPONSE,
}), }),
) )
@ -155,7 +210,7 @@ def test_plugin_revolt_urls():
@mock.patch('requests.post') @mock.patch('requests.post')
def test_plugin_revolt_notifications(mock_post): def test_plugin_revolt_notifications(mock_post):
""" """
NotifyRevolt() Notifications/Ping Support NotifyRevolt() Notifications
""" """
@ -166,6 +221,7 @@ def test_plugin_revolt_notifications(mock_post):
# Prepare Mock # Prepare Mock
mock_post.return_value = requests.Request() mock_post.return_value = requests.Request()
mock_post.return_value.status_code = requests.codes.ok mock_post.return_value.status_code = requests.codes.ok
mock_post.return_value.content = REVOLT_GOOD_RESPONSE
# Test our header parsing when not lead with a header # Test our header parsing when not lead with a header
body = """ body = """
@ -179,7 +235,7 @@ def test_plugin_revolt_notifications(mock_post):
assert isinstance(results, dict) assert isinstance(results, dict)
assert results['user'] is None assert results['user'] is None
assert results['bot_token'] == bot_token assert results['bot_token'] == bot_token
assert results['channel_id'] == channel_id assert results['targets'] == [channel_id, ]
assert results['password'] is None assert results['password'] is None
assert results['port'] is None assert results['port'] is None
assert results['host'] == bot_token assert results['host'] == bot_token
@ -205,7 +261,7 @@ def test_plugin_revolt_notifications(mock_post):
assert isinstance(results, dict) assert isinstance(results, dict)
assert results['user'] is None assert results['user'] is None
assert results['bot_token'] == bot_token assert results['bot_token'] == bot_token
assert results['channel_id'] == channel_id assert results['targets'] == [channel_id, ]
assert results['password'] is None assert results['password'] is None
assert results['port'] is None assert results['port'] is None
assert results['host'] == bot_token assert results['host'] == bot_token
@ -224,48 +280,42 @@ def test_plugin_revolt_notifications(mock_post):
@mock.patch('requests.post') @mock.patch('requests.post')
def test_plugin_revolt_general(mock_post): @mock.patch('time.sleep')
def test_plugin_revolt_general(mock_sleep, mock_post):
""" """
NotifyRevolt() General Checks NotifyRevolt() General Checks
""" """
# Prevent throttling
mock_sleep.return_value = True
# Turn off clock skew for local testing # Turn off clock skew for local testing
NotifyRevolt.clock_skew = timedelta(seconds=0) NotifyRevolt.clock_skew = timedelta(seconds=0)
# Epoch time:
epoch = datetime.fromtimestamp(0, timezone.utc)
# Initialize some generic (but valid) tokens # Initialize some generic (but valid) tokens
bot_token = 'A' * 24 bot_token = 'A' * 24
channel_id = 'B' * 64 channel_id = ','.join(['B' * 32, 'C' * 32]) + ', ,%%'
# Prepare Mock # Prepare Mock
mock_post.return_value = requests.Request() mock_post.return_value = requests.Request()
mock_post.return_value.status_code = requests.codes.ok mock_post.return_value.status_code = requests.codes.ok
mock_post.return_value.content = '' mock_post.return_value.content = REVOLT_GOOD_RESPONSE
mock_post.return_value.headers = { mock_post.return_value.headers = {
'X-RateLimit-Reset': ( 'X-RateLimit-Remaining': 0,
datetime.now(timezone.utc) - epoch).total_seconds(), 'X-RateLimit-Reset-After': 1,
'X-RateLimit-Remaining': 1,
} }
# Invalid bot_token # Invalid bot_token
with pytest.raises(TypeError): with pytest.raises(TypeError):
NotifyRevolt(bot_token=None, channel_id=channel_id) NotifyRevolt(bot_token=None, targets=channel_id)
# Invalid bot_token (whitespace) # Invalid bot_token (whitespace)
with pytest.raises(TypeError): with pytest.raises(TypeError):
NotifyRevolt(bot_token=" ", channel_id=channel_id) NotifyRevolt(bot_token=" ", targets=channel_id)
# Invalid channel_id
with pytest.raises(TypeError):
NotifyRevolt(bot_token=bot_token, channel_id=None)
# Invalid channel_id (whitespace)
with pytest.raises(TypeError):
NotifyRevolt(bot_token=bot_token, channel_id=" ")
obj = NotifyRevolt( obj = NotifyRevolt(
bot_token=bot_token, bot_token=bot_token,
channel_id=channel_id) targets=channel_id)
assert obj.ratelimit_remaining == 1 assert obj.ratelimit_remaining == 1
# Test that we get a string response # Test that we get a string response
@ -277,9 +327,8 @@ def test_plugin_revolt_general(mock_post):
# Force a case where there are no more remaining posts allowed # Force a case where there are no more remaining posts allowed
mock_post.return_value.headers = { mock_post.return_value.headers = {
'X-RateLimit-Reset': (
datetime.now(timezone.utc) - epoch).total_seconds(),
'X-RateLimit-Remaining': 0, 'X-RateLimit-Remaining': 0,
'X-RateLimit-Reset-After': 0,
} }
# This call includes an image with it's payload: # This call includes an image with it's payload:
@ -289,31 +338,31 @@ def test_plugin_revolt_general(mock_post):
# behind the scenes, it should cause us to update our rate limit # behind the scenes, it should cause us to update our rate limit
assert obj.send(body="test") is True assert obj.send(body="test") is True
assert obj.ratelimit_remaining == 0 assert obj.ratelimit_remaining == 0
assert isinstance(obj.ratelimit_reset, datetime)
# This should cause us to block # This should cause us to block
mock_post.return_value.headers = { mock_post.return_value.headers = {
'X-RateLimit-Reset': ( 'X-RateLimit-Remaining': 0,
datetime.now(timezone.utc) - epoch).total_seconds(), 'X-RateLimit-Reset-After': 3000,
'X-RateLimit-Remaining': 10,
} }
assert obj.send(body="test") is True assert obj.send(body="test") is True
assert obj.ratelimit_remaining == 10 assert obj.ratelimit_remaining == 0
assert isinstance(obj.ratelimit_reset, datetime)
# Reset our variable back to 1 # Reset our variable back to 1
mock_post.return_value.headers = { mock_post.return_value.headers = {
'X-RateLimit-Reset': ( 'X-RateLimit-Remaining': 0,
datetime.now(timezone.utc) - epoch).total_seconds(), 'X-RateLimit-Reset-After': 10000,
'X-RateLimit-Remaining': 1,
} }
# Handle cases where our epoch time is wrong del mock_post.return_value.headers['X-RateLimit-Remaining']
del mock_post.return_value.headers['X-RateLimit-Reset']
assert obj.send(body="test") is True assert obj.send(body="test") is True
assert obj.ratelimit_remaining == 0
assert isinstance(obj.ratelimit_reset, datetime)
# Return our object, but place it in the future forcing us to block # Return our object, but place it in the future forcing us to block
mock_post.return_value.headers = { mock_post.return_value.headers = {
'X-RateLimit-Reset': (
datetime.now(timezone.utc) - epoch).total_seconds() + 1,
'X-RateLimit-Remaining': 0, 'X-RateLimit-Remaining': 0,
'X-RateLimit-Reset-After': 0,
} }
obj.ratelimit_remaining = 0 obj.ratelimit_remaining = 0
@ -329,9 +378,8 @@ def test_plugin_revolt_general(mock_post):
# Return our object, but place it in the future forcing us to block # Return our object, but place it in the future forcing us to block
mock_post.return_value.status_code = requests.codes.ok mock_post.return_value.status_code = requests.codes.ok
mock_post.return_value.headers = { mock_post.return_value.headers = {
'X-RateLimit-Reset': (
datetime.now(timezone.utc) - epoch).total_seconds() - 1,
'X-RateLimit-Remaining': 0, 'X-RateLimit-Remaining': 0,
'X-RateLimit-Reset-After': 0,
} }
assert obj.send(body="test") is True assert obj.send(body="test") is True
@ -340,9 +388,8 @@ def test_plugin_revolt_general(mock_post):
# Return our headers to normal # Return our headers to normal
mock_post.return_value.headers = { mock_post.return_value.headers = {
'X-RateLimit-Reset': ( 'X-RateLimit-Remaining': 0,
datetime.now(timezone.utc) - epoch).total_seconds(), 'X-RateLimit-Reset-After': 1,
'X-RateLimit-Remaining': 1,
} }
# This call includes an image with it's payload: # This call includes an image with it's payload:
@ -380,6 +427,7 @@ def test_plugin_revolt_overflow(mock_post):
# Prepare Mock # Prepare Mock
mock_post.return_value = requests.Request() mock_post.return_value = requests.Request()
mock_post.return_value.status_code = requests.codes.ok mock_post.return_value.status_code = requests.codes.ok
mock_post.return_value.content = REVOLT_GOOD_RESPONSE
# Some variables we use to control the data we work with # Some variables we use to control the data we work with
body_len = 2005 body_len = 2005
@ -401,7 +449,7 @@ def test_plugin_revolt_overflow(mock_post):
assert isinstance(results, dict) assert isinstance(results, dict)
assert results['user'] is None assert results['user'] is None
assert results['bot_token'] == bot_token assert results['bot_token'] == bot_token
assert results['channel_id'] == channel_id assert results['targets'] == [channel_id, ]
assert results['password'] is None assert results['password'] is None
assert results['port'] is None assert results['port'] is None
assert results['host'] == bot_token assert results['host'] == bot_token
@ -436,6 +484,7 @@ def test_plugin_revolt_markdown_extra(mock_post):
# Prepare Mock # Prepare Mock
mock_post.return_value = requests.Request() mock_post.return_value = requests.Request()
mock_post.return_value.status_code = requests.codes.ok mock_post.return_value.status_code = requests.codes.ok
mock_post.return_value.content = REVOLT_GOOD_RESPONSE
# Reset our apprise object # Reset our apprise object
a = Apprise() a = Apprise()