Ntfy attachment support added (#571)

This commit is contained in:
Chris Caron 2022-04-18 17:07:47 -04:00 committed by GitHub
parent 41fe862241
commit 65d447ec4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 306 additions and 99 deletions

View File

@ -29,6 +29,7 @@ import re
import requests
import six
from json import loads
from json import dumps
from os.path import basename
from .NotifyBase import NotifyBase
@ -39,6 +40,7 @@ from ..utils import is_hostname
from ..utils import is_ipaddr
from ..utils import validate_regex
from ..URLBase import PrivacyMode
from ..attachment.AttachBase import AttachBase
class NtfyMode(object):
@ -258,7 +260,8 @@ class NotifyNtfy(NotifyBase):
self.topics.append(topic)
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
**kwargs):
"""
Perform ntfy Notification
"""
@ -271,41 +274,71 @@ class NotifyNtfy(NotifyBase):
self.logger.warning('There are no ntfy topics to notify')
return False
# Create a copy of the subreddits list
topics = list(self.topics)
while len(topics) > 0:
# Retrieve our topic
topic = topics.pop()
if attach:
# We need to upload our payload first so that we can source it
# in remaining messages
for no, attachment in enumerate(attach):
# First message only includes the text
_body = body if not no else None
_title = title if not no else None
# Perform some simple error checking
if not attachment:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
attachment.url(privacy=True)))
return False
self.logger.debug(
'Preparing ntfy attachment {}'.format(
attachment.url(privacy=True)))
okay, response = self._send(
topic, body=_body, title=_title, attach=attachment)
if not okay:
# We can't post our attachment; abort immediately
return False
else:
# Send our Notification Message
okay, response = self._send(topic, body=body, title=title)
if not okay:
# Mark our failure, but contiue to move on
has_error = True
return not has_error
def _send(self, topic, body=None, title=None, attach=None, **kwargs):
"""
Wrapper to the requests (post) object
"""
# Prepare our headers
headers = {
'User-Agent': self.app_id,
}
if self.priority != NtfyPriority.NORMAL:
headers['X-Priority'] = self.priority
# Some default values for our request object to which we'll update
# depending on what our payload is
files = None
if title:
headers['X-Title'] = title
# See https://ntfy.sh/docs/publish/#publish-as-json
data = {}
if self.attach is not None:
headers['X-Attach'] = self.attach
if self.filename is not None:
headers['X-Filename'] = self.filename
if self.click is not None:
headers['X-Click'] = self.click
if self.delay is not None:
headers['X-Delay'] = self.delay
if self.email is not None:
headers['X-Email'] = self.email
if self.__tags:
headers['X-Tags'] = ",".join(self.__tags)
# Prepare our payload
payload = body
# Posting Parameters
params = {}
auth = None
if self.mode == NtfyMode.CLOUD:
# Cloud Service
template_url = self.cloud_notify_url
notify_url = self.cloud_notify_url
else: # NotifyNtfy.PRVATE
# Allow more settings to be applied now
@ -315,91 +348,142 @@ class NotifyNtfy(NotifyBase):
# Prepare our ntfy Template URL
schema = 'https' if self.secure else 'http'
template_url = '%s://%s' % (schema, self.host)
notify_url = '%s://%s' % (schema, self.host)
if isinstance(self.port, int):
template_url += ':%d' % self.port
notify_url += ':%d' % self.port
template_url += '/{topic}'
if not attach:
headers['Content-Type'] = 'application/json'
# Create a copy of the subreddits list
topics = list(self.topics)
while len(topics) > 0:
# Retrieve our topic
topic = topics.pop()
data['topic'] = topic
virt_payload = data
# Create our Posting URL per topic provided
url = template_url.format(topic=topic)
self.logger.debug('ntfy POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
))
self.logger.debug('ntfy Payload: %s' % str(payload))
else:
# Point our payload to our parameters
virt_payload = params
notify_url += '/{topic}'.format(topic=topic)
# Always call throttle before any remote server i/o is made
self.throttle()
if title:
virt_payload['title'] = title
try:
r = requests.post(
url,
data=payload,
headers=headers,
auth=auth,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
if body:
virt_payload['message'] = body
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyBase.http_response_code_lookup(r.status_code)
if self.priority != NtfyPriority.NORMAL:
headers['X-Priority'] = self.priority
# set up our status code to use
status_code = r.status_code
if self.delay is not None:
headers['X-Delay'] = self.delay
try:
# Update our status response if we can
json_response = loads(r.content)
status_str = json_response.get('error', status_str)
status_code = \
int(json_response.get('code', status_code))
if self.click is not None:
headers['X-Click'] = self.click
except (AttributeError, TypeError, ValueError):
# ValueError = r.content is Unparsable
# TypeError = r.content is None
# AttributeError = r is None
if self.email is not None:
headers['X-Email'] = self.email
# We could not parse JSON response.
# We will just use the status we already have.
pass
if self.__tags:
headers['X-Tags'] = ",".join(self.__tags)
self.logger.warning(
"Failed to send ntfy notification to topic '{}': "
'{}{}error={}.'.format(
topic,
status_str,
', ' if status_str else '',
status_code))
if isinstance(attach, AttachBase):
# Prepare our Header
params['filename'] = attach.name
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# prepare our files object
files = {'file': (attach.name, open(attach.path, 'rb'))}
# Mark our failure
has_error = True
elif self.attach is not None:
data['attach'] = self.attach
if self.filename is not None:
data['filename'] = self.filename
else:
self.logger.info(
"Sent ntfy notification to '{}'.".format(url))
self.logger.debug('ntfy POST URL: %s (cert_verify=%r)' % (
notify_url, self.verify_certificate,
))
self.logger.debug('ntfy Payload: %s' % str(virt_payload))
self.logger.debug('ntfy Headers: %s' % str(headers))
# Always call throttle before any remote server i/o is made
self.throttle()
# Default response type
response = None
try:
r = requests.post(
notify_url,
params=params if params else None,
data=dumps(data) if data else None,
headers=headers,
files=files,
auth=auth,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyBase.http_response_code_lookup(r.status_code)
# set up our status code to use
status_code = r.status_code
try:
# Update our status response if we can
response = loads(r.content)
status_str = response.get('error', status_str)
status_code = \
int(response.get('code', status_code))
except (AttributeError, TypeError, ValueError):
# ValueError = r.content is Unparsable
# TypeError = r.content is None
# AttributeError = r is None
# We could not parse JSON response.
# We will just use the status we already have.
pass
except requests.RequestException as e:
self.logger.warning(
'A Connection error occurred sending ntfy:%s ' % (
url) + 'notification.'
)
self.logger.debug('Socket Exception: %s' % str(e))
"Failed to send ntfy notification to topic '{}': "
'{}{}error={}.'.format(
topic,
status_str,
', ' if status_str else '',
status_code))
# Mark our failure
has_error = True
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
return not has_error
return False, response
# otherwise we were successful
self.logger.info(
"Sent ntfy notification to '{}'.".format(notify_url))
return True, response
except requests.RequestException as e:
self.logger.warning(
'A Connection error occurred sending ntfy:%s ' % (
notify_url) + 'notification.'
)
self.logger.debug('Socket Exception: %s' % str(e))
return False, response
except (OSError, IOError) as e:
self.logger.warning(
'An I/O error occurred while handling {}.'.format(
attach.name if isinstance(attach, AttachBase)
else virt_payload))
self.logger.debug('I/O Exception: %s' % str(e))
return False, response
finally:
# Close our file (if it's open) stored in the second element
# of our files tuple (index 1)
if files:
files['file'][1].close()
def url(self, privacy=False, *args, **kwargs):
"""

View File

@ -22,17 +22,23 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import os
import json
import mock
import requests
from apprise import Apprise
from apprise import plugins
from apprise import NotifyType
from helpers import AppriseURLTester
from apprise import AppriseAttachment
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
# Attachment Directory
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
# For testing our return response
GOOD_RESPONSE_TEXT = {
'code': '0',
@ -66,7 +72,7 @@ apprise_url_tests = (
'response': False,
}),
# No topics
('ntfy://user:pass@localhost', {
('ntfy://user:pass@localhost?mode=private', {
'instance': plugins.NotifyNtfy,
# invalid topics specified (nothing to notify)
# as a result the response type will be false
@ -197,7 +203,7 @@ apprise_url_tests = (
('ntfys://user:web@-_/topic1/topic2/?mode=private', {
'instance': None,
}),
('ntfy://user:pass@localhost:8081/topic/topic2', {
('ntfy://user:pass@localhost:8089/topic/topic2', {
'instance': plugins.NotifyNtfy,
# force a failure using basic mode
'response': False,
@ -230,6 +236,121 @@ def test_plugin_ntfy_chat_urls():
AppriseURLTester(tests=apprise_url_tests).run_all()
@mock.patch('requests.post')
def test_plugin_ntfy_attachments(mock_post):
"""
NotifyNtfy() Attachment Checks
"""
# Disable Throttling to speed testing
plugins.NotifyNtfy.request_rate_per_sec = 0
# Prepare Mock return object
response = mock.Mock()
response.content = GOOD_RESPONSE_TEXT
response.status_code = requests.codes.ok
mock_post.return_value = response
# Test how the notifications work without attachments as they use the
# JSON type posting instead
# Reset our mock object
mock_post.reset_mock()
# Prepare our object
obj = Apprise.instantiate(
'ntfy://user:pass@localhost:8080/topic')
# Send a good attachment
assert obj.notify(title="hello", body="world")
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
'http://localhost:8080'
response = json.loads(mock_post.call_args_list[0][1]['data'])
assert response['topic'] == 'topic'
assert response['title'] == 'hello'
assert response['message'] == 'world'
assert 'attach' not in response
# Reset our mock object
mock_post.reset_mock()
# prepare our attachment
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
# Prepare our object
obj = Apprise.instantiate(
'ntfy://user:pass@localhost:8084/topic')
# Send a good attachment
assert obj.notify(body="test", attach=attach) is True
# Test our call count; includes both image and message
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
'http://localhost:8084/topic'
assert mock_post.call_args_list[0][1]['params']['message'] == 'test'
assert 'title' not in mock_post.call_args_list[0][1]['params']
assert mock_post.call_args_list[0][1]['params']['filename'] == \
'apprise-test.gif'
# Reset our mock object
mock_post.reset_mock()
# Add another attachment so we drop into the area of the PushBullet code
# that sends remaining attachments (if more detected)
attach.add(os.path.join(TEST_VAR_DIR, 'apprise-test.png'))
# Send our attachments
assert obj.notify(body="test", title="wonderful", attach=attach) is True
# Test our call count
assert mock_post.call_count == 2
# Image + Message sent
assert mock_post.call_args_list[0][0][0] == \
'http://localhost:8084/topic'
assert mock_post.call_args_list[0][1]['params']['message'] == \
'test'
assert mock_post.call_args_list[0][1]['params']['title'] == \
'wonderful'
assert mock_post.call_args_list[0][1]['params']['filename'] == \
'apprise-test.gif'
# Image no 2 (no message)
assert mock_post.call_args_list[1][0][0] == \
'http://localhost:8084/topic'
assert 'message' not in mock_post.call_args_list[1][1]['params']
assert 'title' not in mock_post.call_args_list[1][1]['params']
assert mock_post.call_args_list[1][1]['params']['filename'] == \
'apprise-test.png'
# Reset our mock object
mock_post.reset_mock()
# An invalid attachment will cause a failure
path = os.path.join(TEST_VAR_DIR, '/invalid/path/to/an/invalid/file.jpg')
attach = AppriseAttachment(path)
assert obj.notify(body="test", attach=attach) is False
# Test our call count
assert mock_post.call_count == 0
# prepare our attachment
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
# Throw an exception on the first call to requests.post()
mock_post.return_value = None
for side_effect in (requests.RequestException(), OSError()):
mock_post.side_effect = side_effect
# We'll fail now because of our error handling
assert obj.send(body="test", attach=attach) is False
@mock.patch('requests.post')
def test_plugin_custom_ntfy_edge_cases(mock_post):
"""
@ -297,9 +418,11 @@ def test_plugin_custom_ntfy_edge_cases(mock_post):
# Test our call count
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
'http://localhost/topic1'
assert mock_post.call_args_list[0][1]['headers'].get('X-Attach') == \
'http://example.com/file.jpg'
assert mock_post.call_args_list[0][1]['headers'].get('X-Filename') == \
'smoke.jpg'
assert mock_post.call_args_list[0][0][0] == 'http://localhost'
response = json.loads(mock_post.call_args_list[0][1]['data'])
assert response['topic'] == 'topic1'
assert response['message'] == 'body'
assert response['title'] == 'title'
assert response['attach'] == 'http://example.com/file.jpg'
assert response['filename'] == 'smoke.jpg'