mirror of
https://github.com/caronc/apprise.git
synced 2024-12-04 22:11:25 +01:00
794 lines
27 KiB
Python
794 lines
27 KiB
Python
# -*- coding: utf-8 -*-
|
|
# BSD 2-Clause License
|
|
#
|
|
# Apprise - Push Notification Library.
|
|
# Copyright (c) 2024, Chris Caron <lead2gold@gmail.com>
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are met:
|
|
#
|
|
# 1. Redistributions of source code must retain the above copyright notice,
|
|
# this list of conditions and the following disclaimer.
|
|
#
|
|
# 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
# this list of conditions and the following disclaimer in the documentation
|
|
# and/or other materials provided with the distribution.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
|
|
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
import os
|
|
from unittest import mock
|
|
from datetime import datetime, timedelta
|
|
from datetime import timezone
|
|
import pytest
|
|
import requests
|
|
from json import loads
|
|
|
|
from apprise.plugins.discord import NotifyDiscord
|
|
from helpers import AppriseURLTester
|
|
from apprise import Apprise
|
|
from apprise import AppriseAttachment
|
|
from apprise import NotifyType
|
|
from apprise import NotifyFormat
|
|
from apprise.common import OverflowMode
|
|
|
|
from random import choice
|
|
from string import ascii_uppercase as str_alpha
|
|
from string import digits as str_num
|
|
|
|
# Disable logging for a cleaner testing output
|
|
import logging
|
|
logging.disable(logging.CRITICAL)
|
|
|
|
# Attachment Directory
|
|
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
|
|
|
|
# Our Testing URLs
|
|
apprise_url_tests = (
|
|
('discord://', {
|
|
'instance': TypeError,
|
|
}),
|
|
# An invalid url
|
|
('discord://:@/', {
|
|
'instance': TypeError,
|
|
}),
|
|
# No webhook_token specified
|
|
('discord://%s' % ('i' * 24), {
|
|
'instance': TypeError,
|
|
}),
|
|
# Provide both an webhook id and a webhook token
|
|
('discord://%s/%s' % ('i' * 24, 't' * 64), {
|
|
'instance': NotifyDiscord,
|
|
'requests_response_code': requests.codes.no_content,
|
|
}),
|
|
# Provide a temporary username
|
|
('discord://l2g@%s/%s' % ('i' * 24, 't' * 64), {
|
|
'instance': NotifyDiscord,
|
|
'requests_response_code': requests.codes.no_content,
|
|
}),
|
|
# test image= field
|
|
('discord://%s/%s?format=markdown&footer=Yes&image=Yes' % (
|
|
'i' * 24, 't' * 64), {
|
|
'instance': NotifyDiscord,
|
|
'requests_response_code': requests.codes.no_content,
|
|
# don't include an image by default
|
|
'include_image': False,
|
|
}),
|
|
('discord://%s/%s?format=markdown&footer=Yes&image=No&fields=no' % (
|
|
'i' * 24, 't' * 64), {
|
|
'instance': NotifyDiscord,
|
|
'requests_response_code': requests.codes.no_content,
|
|
}),
|
|
('discord://%s/%s?format=markdown&footer=Yes&image=Yes' % (
|
|
'i' * 24, 't' * 64), {
|
|
'instance': NotifyDiscord,
|
|
'requests_response_code': requests.codes.no_content,
|
|
}),
|
|
('https://discord.com/api/webhooks/{}/{}'.format(
|
|
'0' * 10, 'B' * 40), {
|
|
# Native URL Support, support the provided discord URL from their
|
|
# webpage.
|
|
'instance': NotifyDiscord,
|
|
'requests_response_code': requests.codes.no_content,
|
|
}),
|
|
('https://discordapp.com/api/webhooks/{}/{}'.format(
|
|
'0' * 10, 'B' * 40), {
|
|
# Legacy Native URL Support, support the older URL (to be
|
|
# decomissioned on Nov 7th 2020)
|
|
'instance': NotifyDiscord,
|
|
'requests_response_code': requests.codes.no_content,
|
|
}),
|
|
('https://discordapp.com/api/webhooks/{}/{}?footer=yes'.format(
|
|
'0' * 10, 'B' * 40), {
|
|
# Native URL Support with arguments
|
|
'instance': NotifyDiscord,
|
|
'requests_response_code': requests.codes.no_content,
|
|
}),
|
|
('discord://%s/%s?format=markdown&avatar=No&footer=No' % (
|
|
'i' * 24, 't' * 64), {
|
|
'instance': NotifyDiscord,
|
|
'requests_response_code': requests.codes.no_content,
|
|
}),
|
|
# different format support
|
|
('discord://%s/%s?format=markdown' % ('i' * 24, 't' * 64), {
|
|
'instance': NotifyDiscord,
|
|
'requests_response_code': requests.codes.no_content,
|
|
}),
|
|
# Thread ID
|
|
('discord://%s/%s?format=markdown&thread=abc123' % (
|
|
'i' * 24, 't' * 64), {
|
|
'instance': NotifyDiscord,
|
|
'requests_response_code': requests.codes.no_content,
|
|
}),
|
|
('discord://%s/%s?format=text' % ('i' * 24, 't' * 64), {
|
|
'instance': NotifyDiscord,
|
|
'requests_response_code': requests.codes.no_content,
|
|
}),
|
|
# Test with href (title link)
|
|
('discord://%s/%s?hmarkdown=true&ref=http://localhost' % (
|
|
'i' * 24, 't' * 64), {
|
|
'instance': NotifyDiscord,
|
|
'requests_response_code': requests.codes.no_content,
|
|
}),
|
|
# Test with url (title link) - Alias of href
|
|
('discord://%s/%s?markdown=true&url=http://localhost' % (
|
|
'i' * 24, 't' * 64), {
|
|
'instance': NotifyDiscord,
|
|
'requests_response_code': requests.codes.no_content,
|
|
}),
|
|
# Test with avatar URL
|
|
('discord://%s/%s?avatar_url=http://localhost/test.jpg' % (
|
|
'i' * 24, 't' * 64), {
|
|
'instance': NotifyDiscord,
|
|
'requests_response_code': requests.codes.no_content,
|
|
}),
|
|
# Test without image set
|
|
('discord://%s/%s' % ('i' * 24, 't' * 64), {
|
|
'instance': NotifyDiscord,
|
|
'requests_response_code': requests.codes.no_content,
|
|
# don't include an image by default
|
|
'include_image': False,
|
|
}),
|
|
('discord://%s/%s/' % ('a' * 24, 'b' * 64), {
|
|
'instance': NotifyDiscord,
|
|
# force a failure
|
|
'response': False,
|
|
'requests_response_code': requests.codes.internal_server_error,
|
|
}),
|
|
('discord://%s/%s/' % ('a' * 24, 'b' * 64), {
|
|
'instance': NotifyDiscord,
|
|
# throw a bizzare code forcing us to fail to look it up
|
|
'response': False,
|
|
'requests_response_code': 999,
|
|
}),
|
|
('discord://%s/%s/' % ('a' * 24, 'b' * 64), {
|
|
'instance': NotifyDiscord,
|
|
# Throws a series of connection and transfer exceptions when this flag
|
|
# is set and tests that we gracfully handle them
|
|
'test_requests_exceptions': True,
|
|
}),
|
|
)
|
|
|
|
|
|
def test_plugin_discord_urls():
|
|
"""
|
|
NotifyDiscord() Apprise URLs
|
|
|
|
"""
|
|
|
|
# Run our general tests
|
|
AppriseURLTester(tests=apprise_url_tests).run_all()
|
|
|
|
|
|
@mock.patch('requests.post')
|
|
def test_plugin_discord_notifications(mock_post):
|
|
"""
|
|
NotifyDiscord() Notifications/Ping Support
|
|
|
|
"""
|
|
|
|
# Initialize some generic (but valid) tokens
|
|
webhook_id = 'A' * 24
|
|
webhook_token = 'B' * 64
|
|
|
|
# Prepare Mock
|
|
mock_post.return_value = requests.Request()
|
|
mock_post.return_value.status_code = requests.codes.ok
|
|
|
|
# Test our header parsing when not lead with a header
|
|
body = """
|
|
# Heading
|
|
@everyone and @admin, wake and meet our new user <@123>; <@&456>"
|
|
"""
|
|
|
|
results = NotifyDiscord.parse_url(
|
|
f'discord://{webhook_id}/{webhook_token}/?format=markdown')
|
|
|
|
assert isinstance(results, dict)
|
|
assert results['user'] is None
|
|
assert results['webhook_id'] == webhook_id
|
|
assert results['webhook_token'] == webhook_token
|
|
assert results['password'] is None
|
|
assert results['port'] is None
|
|
assert results['host'] == webhook_id
|
|
assert results['fullpath'] == f'/{webhook_token}/'
|
|
assert results['path'] == f'/{webhook_token}/'
|
|
assert results['query'] is None
|
|
assert results['schema'] == 'discord'
|
|
assert results['url'] == f'discord://{webhook_id}/{webhook_token}/'
|
|
|
|
instance = NotifyDiscord(**results)
|
|
assert isinstance(instance, NotifyDiscord)
|
|
|
|
response = instance.send(body=body)
|
|
assert response is True
|
|
assert mock_post.call_count == 1
|
|
|
|
details = mock_post.call_args_list[0]
|
|
assert details[0][0] == \
|
|
f'https://discord.com/api/webhooks/{webhook_id}/{webhook_token}'
|
|
|
|
payload = loads(details[1]['data'])
|
|
|
|
assert 'allow_mentions' in payload
|
|
assert 'users' in payload['allow_mentions']
|
|
assert len(payload['allow_mentions']['users']) == 1
|
|
assert '123' in payload['allow_mentions']['users']
|
|
assert 'roles' in payload['allow_mentions']
|
|
assert len(payload['allow_mentions']['roles']) == 1
|
|
assert '456' in payload['allow_mentions']['roles']
|
|
assert 'parse' in payload['allow_mentions']
|
|
assert len(payload['allow_mentions']['parse']) == 2
|
|
assert 'everyone' in payload['allow_mentions']['parse']
|
|
assert 'admin' in payload['allow_mentions']['parse']
|
|
|
|
# Reset our object
|
|
mock_post.reset_mock()
|
|
|
|
results = NotifyDiscord.parse_url(
|
|
f'discord://{webhook_id}/{webhook_token}/?format=text')
|
|
|
|
assert isinstance(results, dict)
|
|
assert results['user'] is None
|
|
assert results['webhook_id'] == webhook_id
|
|
assert results['webhook_token'] == webhook_token
|
|
assert results['password'] is None
|
|
assert results['port'] is None
|
|
assert results['host'] == webhook_id
|
|
assert results['fullpath'] == f'/{webhook_token}/'
|
|
assert results['path'] == f'/{webhook_token}/'
|
|
assert results['query'] is None
|
|
assert results['schema'] == 'discord'
|
|
assert results['url'] == f'discord://{webhook_id}/{webhook_token}/'
|
|
|
|
instance = NotifyDiscord(**results)
|
|
assert isinstance(instance, NotifyDiscord)
|
|
|
|
response = instance.send(body=body)
|
|
assert response is True
|
|
assert mock_post.call_count == 1
|
|
|
|
details = mock_post.call_args_list[0]
|
|
assert details[0][0] == \
|
|
f'https://discord.com/api/webhooks/{webhook_id}/{webhook_token}'
|
|
|
|
payload = loads(details[1]['data'])
|
|
|
|
assert 'allow_mentions' in payload
|
|
assert 'users' in payload['allow_mentions']
|
|
assert len(payload['allow_mentions']['users']) == 1
|
|
assert '123' in payload['allow_mentions']['users']
|
|
assert 'roles' in payload['allow_mentions']
|
|
assert len(payload['allow_mentions']['roles']) == 1
|
|
assert '456' in payload['allow_mentions']['roles']
|
|
assert 'parse' in payload['allow_mentions']
|
|
assert len(payload['allow_mentions']['parse']) == 2
|
|
assert 'everyone' in payload['allow_mentions']['parse']
|
|
assert 'admin' in payload['allow_mentions']['parse']
|
|
|
|
|
|
@mock.patch('requests.post')
|
|
@mock.patch('time.sleep')
|
|
def test_plugin_discord_general(mock_sleep, mock_post):
|
|
"""
|
|
NotifyDiscord() General Checks
|
|
|
|
"""
|
|
|
|
# Prevent throttling
|
|
mock_sleep.return_value = True
|
|
|
|
# Turn off clock skew for local testing
|
|
NotifyDiscord.clock_skew = timedelta(seconds=0)
|
|
|
|
# Epoch time:
|
|
epoch = datetime.fromtimestamp(0, timezone.utc)
|
|
|
|
# Initialize some generic (but valid) tokens
|
|
webhook_id = 'A' * 24
|
|
webhook_token = 'B' * 64
|
|
|
|
# Prepare Mock
|
|
mock_post.return_value = requests.Request()
|
|
mock_post.return_value.status_code = requests.codes.ok
|
|
mock_post.return_value.content = ''
|
|
mock_post.return_value.headers = {
|
|
'X-RateLimit-Reset': (
|
|
datetime.now(timezone.utc) - epoch).total_seconds(),
|
|
'X-RateLimit-Remaining': 1,
|
|
}
|
|
|
|
# Invalid webhook id
|
|
with pytest.raises(TypeError):
|
|
NotifyDiscord(webhook_id=None, webhook_token=webhook_token)
|
|
# Invalid webhook id (whitespace)
|
|
with pytest.raises(TypeError):
|
|
NotifyDiscord(webhook_id=" ", webhook_token=webhook_token)
|
|
|
|
# Invalid webhook token
|
|
with pytest.raises(TypeError):
|
|
NotifyDiscord(webhook_id=webhook_id, webhook_token=None)
|
|
# Invalid webhook token (whitespace)
|
|
with pytest.raises(TypeError):
|
|
NotifyDiscord(webhook_id=webhook_id, webhook_token=" ")
|
|
|
|
obj = NotifyDiscord(
|
|
webhook_id=webhook_id,
|
|
webhook_token=webhook_token,
|
|
footer=True, thumbnail=False)
|
|
assert obj.ratelimit_remaining == 1
|
|
|
|
# Test that we get a string response
|
|
assert isinstance(obj.url(), str) is True
|
|
|
|
# This call includes an image with it's payload:
|
|
assert obj.notify(
|
|
body='body', title='title', notify_type=NotifyType.INFO) is True
|
|
|
|
# Force a case where there are no more remaining posts allowed
|
|
mock_post.return_value.headers = {
|
|
'X-RateLimit-Reset': (
|
|
datetime.now(timezone.utc) - epoch).total_seconds(),
|
|
'X-RateLimit-Remaining': 0,
|
|
}
|
|
|
|
# This call includes an image with it's payload:
|
|
assert obj.notify(
|
|
body='body', title='title', notify_type=NotifyType.INFO) is True
|
|
|
|
# behind the scenes, it should cause us to update our rate limit
|
|
assert obj.send(body="test") is True
|
|
assert obj.ratelimit_remaining == 0
|
|
|
|
# This should cause us to block
|
|
mock_post.return_value.headers = {
|
|
'X-RateLimit-Reset': (
|
|
datetime.now(timezone.utc) - epoch).total_seconds(),
|
|
'X-RateLimit-Remaining': 10,
|
|
}
|
|
assert obj.send(body="test") is True
|
|
assert obj.ratelimit_remaining == 10
|
|
|
|
# Reset our variable back to 1
|
|
mock_post.return_value.headers = {
|
|
'X-RateLimit-Reset': (
|
|
datetime.now(timezone.utc) - epoch).total_seconds(),
|
|
'X-RateLimit-Remaining': 1,
|
|
}
|
|
# Handle cases where our epoch time is wrong
|
|
del mock_post.return_value.headers['X-RateLimit-Reset']
|
|
assert obj.send(body="test") is True
|
|
|
|
# Return our object, but place it in the future forcing us to block
|
|
mock_post.return_value.headers = {
|
|
'X-RateLimit-Reset': (
|
|
datetime.now(timezone.utc) - epoch).total_seconds() + 1,
|
|
'X-RateLimit-Remaining': 0,
|
|
}
|
|
|
|
obj.ratelimit_remaining = 0
|
|
assert obj.send(body="test") is True
|
|
|
|
# Test 429 error response
|
|
mock_post.return_value.status_code = requests.codes.too_many_requests
|
|
|
|
# The below will attempt a second transmission and fail (because we didn't
|
|
# set up a second post request to pass) :)
|
|
assert obj.send(body="test") is False
|
|
|
|
# Return our object, but place it in the future forcing us to block
|
|
mock_post.return_value.status_code = requests.codes.ok
|
|
mock_post.return_value.headers = {
|
|
'X-RateLimit-Reset': (
|
|
datetime.now(timezone.utc) - epoch).total_seconds() - 1,
|
|
'X-RateLimit-Remaining': 0,
|
|
}
|
|
assert obj.send(body="test") is True
|
|
|
|
# Return our limits to always work
|
|
obj.ratelimit_remaining = 1
|
|
|
|
# Return our headers to normal
|
|
mock_post.return_value.headers = {
|
|
'X-RateLimit-Reset': (
|
|
datetime.now(timezone.utc) - epoch).total_seconds(),
|
|
'X-RateLimit-Remaining': 1,
|
|
}
|
|
|
|
# This call includes an image with it's payload:
|
|
assert obj.notify(
|
|
body='body', title='title', notify_type=NotifyType.INFO) is True
|
|
|
|
# Simple Markdown Single line of text
|
|
test_markdown = "body"
|
|
desc, results = obj.extract_markdown_sections(test_markdown)
|
|
assert isinstance(results, list) is True
|
|
assert len(results) == 0
|
|
|
|
# Test our header parsing when not lead with a header
|
|
test_markdown = """
|
|
A section of text that has no header at the top.
|
|
It also has a hash tag # <- in the middle of a
|
|
string.
|
|
|
|
## Heading 1
|
|
body
|
|
|
|
# Heading 2
|
|
|
|
more content
|
|
on multi-lines
|
|
"""
|
|
|
|
desc, results = obj.extract_markdown_sections(test_markdown)
|
|
# we have a description
|
|
assert isinstance(desc, str) is True
|
|
assert desc.startswith('A section of text that has no header at the top.')
|
|
assert desc.endswith('string.')
|
|
|
|
assert isinstance(results, list) is True
|
|
assert len(results) == 2
|
|
assert results[0]['name'] == 'Heading 1'
|
|
assert results[0]['value'] == '```md\nbody\n```'
|
|
assert results[1]['name'] == 'Heading 2'
|
|
assert results[1]['value'] == \
|
|
'```md\nmore content\n on multi-lines\n```'
|
|
|
|
# Test our header parsing
|
|
test_markdown = "## Heading one\nbody body\n\n" + \
|
|
"# Heading 2 ##\n\nTest\n\n" + \
|
|
"more content\n" + \
|
|
"even more content \t\r\n\n\n" + \
|
|
"# Heading 3 ##\n\n\n" + \
|
|
"normal content\n" + \
|
|
"# heading 4\n" + \
|
|
"#### Heading 5"
|
|
|
|
desc, results = obj.extract_markdown_sections(test_markdown)
|
|
assert isinstance(results, list) is True
|
|
# No desc details filled out
|
|
assert isinstance(desc, str) is True
|
|
assert not desc
|
|
|
|
# We should have 5 sections (since there are 5 headers identified above)
|
|
assert len(results) == 5
|
|
assert results[0]['name'] == 'Heading one'
|
|
assert results[0]['value'] == '```md\nbody body\n```'
|
|
assert results[1]['name'] == 'Heading 2'
|
|
assert results[1]['value'] == \
|
|
'```md\nTest\n\nmore content\neven more content\n```'
|
|
assert results[2]['name'] == 'Heading 3'
|
|
assert results[2]['value'] == \
|
|
'```md\nnormal content\n```'
|
|
assert results[3]['name'] == 'heading 4'
|
|
assert results[3]['value'] == '```\n```'
|
|
assert results[4]['name'] == 'Heading 5'
|
|
assert results[4]['value'] == '```\n```'
|
|
|
|
# Create an apprise instance
|
|
a = Apprise()
|
|
|
|
# Our processing is slightly different when we aren't using markdown
|
|
# as we do not pre-parse content during our notifications
|
|
assert a.add(
|
|
'discord://{webhook_id}/{webhook_token}/'
|
|
'?format=markdown&footer=Yes'.format(
|
|
webhook_id=webhook_id,
|
|
webhook_token=webhook_token)) is True
|
|
|
|
# This call includes an image with it's payload:
|
|
NotifyDiscord.discord_max_fields = 1
|
|
|
|
assert a.notify(body=test_markdown, title='title',
|
|
notify_type=NotifyType.INFO,
|
|
body_format=NotifyFormat.TEXT) is True
|
|
|
|
# Throw an exception on the forth call to requests.post()
|
|
# This allows to test our batch field processing
|
|
response = mock.Mock()
|
|
response.content = ''
|
|
response.status_code = requests.codes.ok
|
|
mock_post.return_value = response
|
|
mock_post.side_effect = [
|
|
response, response, response, requests.RequestException()]
|
|
|
|
# Test our markdown
|
|
obj = Apprise.instantiate(
|
|
'discord://{}/{}/?format=markdown'.format(webhook_id, webhook_token))
|
|
assert isinstance(obj, NotifyDiscord)
|
|
assert obj.notify(
|
|
body=test_markdown, title='title',
|
|
notify_type=NotifyType.INFO) is False
|
|
mock_post.side_effect = None
|
|
|
|
# Empty String
|
|
desc, results = obj.extract_markdown_sections("")
|
|
assert isinstance(results, list) is True
|
|
assert len(results) == 0
|
|
|
|
# No desc details filled out
|
|
assert isinstance(desc, str) is True
|
|
assert not desc
|
|
|
|
# String without Heading
|
|
test_markdown = "Just a string without any header entries.\n" + \
|
|
"A second line"
|
|
desc, results = obj.extract_markdown_sections(test_markdown)
|
|
assert isinstance(results, list) is True
|
|
assert len(results) == 0
|
|
|
|
# No desc details filled out
|
|
assert isinstance(desc, str) is True
|
|
assert desc == 'Just a string without any header entries.\n' + \
|
|
'A second line'
|
|
|
|
# Use our test markdown string during a notification
|
|
assert obj.notify(
|
|
body=test_markdown, title='title', notify_type=NotifyType.INFO) is True
|
|
|
|
# Create an apprise instance
|
|
a = Apprise()
|
|
|
|
# Our processing is slightly different when we aren't using markdown
|
|
# as we do not pre-parse content during our notifications
|
|
assert a.add(
|
|
'discord://{webhook_id}/{webhook_token}/'
|
|
'?format=markdown&footer=Yes'.format(
|
|
webhook_id=webhook_id,
|
|
webhook_token=webhook_token)) is True
|
|
|
|
# This call includes an image with it's payload:
|
|
assert a.notify(body=test_markdown, title='title',
|
|
notify_type=NotifyType.INFO,
|
|
body_format=NotifyFormat.TEXT) is True
|
|
|
|
assert a.notify(body=test_markdown, title='title',
|
|
notify_type=NotifyType.INFO,
|
|
body_format=NotifyFormat.MARKDOWN) is True
|
|
|
|
# Toggle our logo availability
|
|
a.asset.image_url_logo = None
|
|
assert a.notify(
|
|
body='body', title='title', notify_type=NotifyType.INFO) is True
|
|
|
|
# Create an apprise instance
|
|
a = Apprise()
|
|
|
|
# Reset our object
|
|
mock_post.reset_mock()
|
|
|
|
# Test our threading
|
|
assert a.add(
|
|
'discord://{webhook_id}/{webhook_token}/'
|
|
'?thread=12345'.format(
|
|
webhook_id=webhook_id,
|
|
webhook_token=webhook_token)) is True
|
|
|
|
# This call includes an image with it's payload:
|
|
assert a.notify(body='test', title='title') is True
|
|
|
|
assert mock_post.call_count == 1
|
|
response = mock_post.call_args_list[0][1]
|
|
assert 'params' in response
|
|
assert response['params'].get('thread_id') == '12345'
|
|
|
|
|
|
@mock.patch('requests.post')
|
|
def test_plugin_discord_overflow(mock_post):
|
|
"""
|
|
NotifyDiscord() Overflow Checks
|
|
|
|
"""
|
|
|
|
# Initialize some generic (but valid) tokens
|
|
webhook_id = 'A' * 24
|
|
webhook_token = 'B' * 64
|
|
|
|
# Prepare Mock
|
|
mock_post.return_value = requests.Request()
|
|
mock_post.return_value.status_code = requests.codes.ok
|
|
|
|
# Some variables we use to control the data we work with
|
|
body_len = 8000
|
|
title_len = 1024
|
|
|
|
# Number of characters per line
|
|
row = 24
|
|
|
|
# Create a large body and title with random data
|
|
body = ''.join(choice(str_alpha + str_num + ' ') for _ in range(body_len))
|
|
body = '\r\n'.join([body[i: i + row] for i in range(0, len(body), row)])
|
|
|
|
# Create our title using random data
|
|
title = ''.join(choice(str_alpha + str_num) for _ in range(title_len))
|
|
|
|
results = NotifyDiscord.parse_url(
|
|
f'discord://{webhook_id}/{webhook_token}/?overflow=split')
|
|
|
|
assert isinstance(results, dict)
|
|
assert results['user'] is None
|
|
assert results['webhook_id'] == webhook_id
|
|
assert results['webhook_token'] == webhook_token
|
|
assert results['password'] is None
|
|
assert results['port'] is None
|
|
assert results['host'] == webhook_id
|
|
assert results['fullpath'] == f'/{webhook_token}/'
|
|
assert results['path'] == f'/{webhook_token}/'
|
|
assert results['query'] is None
|
|
assert results['schema'] == 'discord'
|
|
assert results['url'] == f'discord://{webhook_id}/{webhook_token}/'
|
|
|
|
instance = NotifyDiscord(**results)
|
|
assert isinstance(instance, NotifyDiscord)
|
|
|
|
results = instance._apply_overflow(
|
|
body, title=title, overflow=OverflowMode.SPLIT)
|
|
|
|
# Ensure we never exceed 2000 characters
|
|
for entry in results:
|
|
assert len(entry['title']) <= instance.title_maxlen
|
|
assert len(entry['title']) + len(entry['body']) <= instance.body_maxlen
|
|
|
|
|
|
@mock.patch('requests.post')
|
|
def test_plugin_discord_markdown_extra(mock_post):
|
|
"""
|
|
NotifyDiscord() Markdown Extra Checks
|
|
|
|
"""
|
|
|
|
# Initialize some generic (but valid) tokens
|
|
webhook_id = 'A' * 24
|
|
webhook_token = 'B' * 64
|
|
|
|
# Prepare Mock
|
|
mock_post.return_value = requests.Request()
|
|
mock_post.return_value.status_code = requests.codes.ok
|
|
|
|
# Reset our apprise object
|
|
a = Apprise()
|
|
|
|
# We want to further test our markdown support to accomodate bug rased on
|
|
# 2022.10.25; see https://github.com/caronc/apprise/issues/717
|
|
assert a.add(
|
|
'discord://{webhook_id}/{webhook_token}/'
|
|
'?format=markdown&footer=Yes'.format(
|
|
webhook_id=webhook_id,
|
|
webhook_token=webhook_token)) is True
|
|
|
|
test_markdown = "[green-blue](https://google.com)"
|
|
|
|
# This call includes an image with it's payload:
|
|
assert a.notify(body=test_markdown, title='title',
|
|
notify_type=NotifyType.INFO,
|
|
body_format=NotifyFormat.TEXT) is True
|
|
|
|
assert a.notify(
|
|
body='body', title='title', notify_type=NotifyType.INFO) is True
|
|
|
|
|
|
@mock.patch('requests.post')
|
|
def test_plugin_discord_attachments(mock_post):
|
|
"""
|
|
NotifyDiscord() Attachment Checks
|
|
|
|
"""
|
|
|
|
# Initialize some generic (but valid) tokens
|
|
webhook_id = 'C' * 24
|
|
webhook_token = 'D' * 64
|
|
|
|
# Prepare a good response
|
|
response = mock.Mock()
|
|
response.status_code = requests.codes.ok
|
|
|
|
# Prepare a bad response
|
|
bad_response = mock.Mock()
|
|
bad_response.status_code = requests.codes.internal_server_error
|
|
|
|
# Prepare Mock return object
|
|
mock_post.return_value = response
|
|
|
|
# Test our markdown
|
|
obj = Apprise.instantiate(
|
|
'discord://{}/{}/?format=markdown'.format(webhook_id, webhook_token))
|
|
|
|
# attach our content
|
|
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
|
|
|
|
assert obj.notify(
|
|
body='body', title='title', notify_type=NotifyType.INFO,
|
|
attach=attach) is True
|
|
|
|
# Test our call count
|
|
assert mock_post.call_count == 2
|
|
assert mock_post.call_args_list[0][0][0] == \
|
|
'https://discord.com/api/webhooks/{}/{}'.format(
|
|
webhook_id, webhook_token)
|
|
assert mock_post.call_args_list[1][0][0] == \
|
|
'https://discord.com/api/webhooks/{}/{}'.format(
|
|
webhook_id, webhook_token)
|
|
|
|
# Reset our object
|
|
mock_post.reset_mock()
|
|
|
|
# Test notifications with mentions and attachments in it
|
|
assert obj.notify(
|
|
body='Say hello to <@1234>!', notify_type=NotifyType.INFO,
|
|
attach=attach) is True
|
|
|
|
# Test our call count
|
|
assert mock_post.call_count == 2
|
|
assert mock_post.call_args_list[0][0][0] == \
|
|
'https://discord.com/api/webhooks/{}/{}'.format(
|
|
webhook_id, webhook_token)
|
|
assert mock_post.call_args_list[1][0][0] == \
|
|
'https://discord.com/api/webhooks/{}/{}'.format(
|
|
webhook_id, webhook_token)
|
|
|
|
# Reset our object
|
|
mock_post.reset_mock()
|
|
|
|
# An invalid attachment will cause a failure
|
|
path = os.path.join(TEST_VAR_DIR, '/invalid/path/to/an/invalid/file.jpg')
|
|
attach = AppriseAttachment(path)
|
|
assert obj.notify(
|
|
body='body', title='title', notify_type=NotifyType.INFO,
|
|
attach=path) is False
|
|
|
|
# update our attachment to be valid
|
|
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
|
|
|
|
mock_post.return_value = None
|
|
# Throw an exception on the first call to requests.post()
|
|
for side_effect in (requests.RequestException(), OSError(), bad_response):
|
|
mock_post.side_effect = [side_effect]
|
|
|
|
# We'll fail now because of our error handling
|
|
assert obj.send(body="test", attach=attach) is False
|
|
|
|
# Throw an exception on the second call to requests.post()
|
|
for side_effect in (requests.RequestException(), OSError(), bad_response):
|
|
mock_post.side_effect = [response, side_effect]
|
|
|
|
# We'll fail now because of our error handling
|
|
assert obj.send(body="test", attach=attach) is False
|
|
|
|
# handle a bad response
|
|
bad_response = mock.Mock()
|
|
bad_response.status_code = requests.codes.internal_server_error
|
|
mock_post.side_effect = [response, bad_response]
|
|
|
|
# We'll fail now because of an internal exception
|
|
assert obj.send(body="test", attach=attach) is False
|