From 65d447ec4c319ca70a53971d68682b56bbbc350f Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Mon, 18 Apr 2022 17:07:47 -0400 Subject: [PATCH] Ntfy attachment support added (#571) --- apprise/plugins/NotifyNtfy.py | 266 ++++++++++++++++++++++------------ test/test_plugin_ntfy.py | 139 +++++++++++++++++- 2 files changed, 306 insertions(+), 99 deletions(-) diff --git a/apprise/plugins/NotifyNtfy.py b/apprise/plugins/NotifyNtfy.py index c691612a..b19cf7a9 100644 --- a/apprise/plugins/NotifyNtfy.py +++ b/apprise/plugins/NotifyNtfy.py @@ -29,6 +29,7 @@ import re import requests import six from json import loads +from json import dumps from os.path import basename from .NotifyBase import NotifyBase @@ -39,6 +40,7 @@ from ..utils import is_hostname from ..utils import is_ipaddr from ..utils import validate_regex from ..URLBase import PrivacyMode +from ..attachment.AttachBase import AttachBase class NtfyMode(object): @@ -258,7 +260,8 @@ class NotifyNtfy(NotifyBase): self.topics.append(topic) return - def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs): + def send(self, body, title='', notify_type=NotifyType.INFO, attach=None, + **kwargs): """ Perform ntfy Notification """ @@ -271,41 +274,71 @@ class NotifyNtfy(NotifyBase): self.logger.warning('There are no ntfy topics to notify') return False + # Create a copy of the subreddits list + topics = list(self.topics) + while len(topics) > 0: + # Retrieve our topic + topic = topics.pop() + + if attach: + # We need to upload our payload first so that we can source it + # in remaining messages + for no, attachment in enumerate(attach): + + # First message only includes the text + _body = body if not no else None + _title = title if not no else None + + # Perform some simple error checking + if not attachment: + # We could not access the attachment + self.logger.error( + 'Could not access attachment {}.'.format( + attachment.url(privacy=True))) + return False + + self.logger.debug( + 'Preparing ntfy attachment {}'.format( + attachment.url(privacy=True))) + + okay, response = self._send( + topic, body=_body, title=_title, attach=attachment) + if not okay: + # We can't post our attachment; abort immediately + return False + else: + # Send our Notification Message + okay, response = self._send(topic, body=body, title=title) + if not okay: + # Mark our failure, but contiue to move on + has_error = True + + return not has_error + + def _send(self, topic, body=None, title=None, attach=None, **kwargs): + """ + Wrapper to the requests (post) object + """ + # Prepare our headers headers = { 'User-Agent': self.app_id, } - if self.priority != NtfyPriority.NORMAL: - headers['X-Priority'] = self.priority + # Some default values for our request object to which we'll update + # depending on what our payload is + files = None - if title: - headers['X-Title'] = title + # See https://ntfy.sh/docs/publish/#publish-as-json + data = {} - if self.attach is not None: - headers['X-Attach'] = self.attach - if self.filename is not None: - headers['X-Filename'] = self.filename - - if self.click is not None: - headers['X-Click'] = self.click - - if self.delay is not None: - headers['X-Delay'] = self.delay - - if self.email is not None: - headers['X-Email'] = self.email - - if self.__tags: - headers['X-Tags'] = ",".join(self.__tags) - - # Prepare our payload - payload = body + # Posting Parameters + params = {} auth = None if self.mode == NtfyMode.CLOUD: # Cloud Service - template_url = self.cloud_notify_url + notify_url = self.cloud_notify_url else: # NotifyNtfy.PRVATE # Allow more settings to be applied now @@ -315,91 +348,142 @@ class NotifyNtfy(NotifyBase): # Prepare our ntfy Template URL schema = 'https' if self.secure else 'http' - template_url = '%s://%s' % (schema, self.host) + notify_url = '%s://%s' % (schema, self.host) if isinstance(self.port, int): - template_url += ':%d' % self.port + notify_url += ':%d' % self.port - template_url += '/{topic}' + if not attach: + headers['Content-Type'] = 'application/json' - # Create a copy of the subreddits list - topics = list(self.topics) - while len(topics) > 0: - # Retrieve our topic - topic = topics.pop() + data['topic'] = topic + virt_payload = data - # Create our Posting URL per topic provided - url = template_url.format(topic=topic) - self.logger.debug('ntfy POST URL: %s (cert_verify=%r)' % ( - url, self.verify_certificate, - )) - self.logger.debug('ntfy Payload: %s' % str(payload)) + else: + # Point our payload to our parameters + virt_payload = params + notify_url += '/{topic}'.format(topic=topic) - # Always call throttle before any remote server i/o is made - self.throttle() + if title: + virt_payload['title'] = title - try: - r = requests.post( - url, - data=payload, - headers=headers, - auth=auth, - verify=self.verify_certificate, - timeout=self.request_timeout, - ) + if body: + virt_payload['message'] = body - if r.status_code != requests.codes.ok: - # We had a problem - status_str = \ - NotifyBase.http_response_code_lookup(r.status_code) + if self.priority != NtfyPriority.NORMAL: + headers['X-Priority'] = self.priority - # set up our status code to use - status_code = r.status_code + if self.delay is not None: + headers['X-Delay'] = self.delay - try: - # Update our status response if we can - json_response = loads(r.content) - status_str = json_response.get('error', status_str) - status_code = \ - int(json_response.get('code', status_code)) + if self.click is not None: + headers['X-Click'] = self.click - except (AttributeError, TypeError, ValueError): - # ValueError = r.content is Unparsable - # TypeError = r.content is None - # AttributeError = r is None + if self.email is not None: + headers['X-Email'] = self.email - # We could not parse JSON response. - # We will just use the status we already have. - pass + if self.__tags: + headers['X-Tags'] = ",".join(self.__tags) - self.logger.warning( - "Failed to send ntfy notification to topic '{}': " - '{}{}error={}.'.format( - topic, - status_str, - ', ' if status_str else '', - status_code)) + if isinstance(attach, AttachBase): + # Prepare our Header + params['filename'] = attach.name - self.logger.debug( - 'Response Details:\r\n{}'.format(r.content)) + # prepare our files object + files = {'file': (attach.name, open(attach.path, 'rb'))} - # Mark our failure - has_error = True + elif self.attach is not None: + data['attach'] = self.attach + if self.filename is not None: + data['filename'] = self.filename - else: - self.logger.info( - "Sent ntfy notification to '{}'.".format(url)) + self.logger.debug('ntfy POST URL: %s (cert_verify=%r)' % ( + notify_url, self.verify_certificate, + )) + self.logger.debug('ntfy Payload: %s' % str(virt_payload)) + self.logger.debug('ntfy Headers: %s' % str(headers)) + + # Always call throttle before any remote server i/o is made + self.throttle() + + # Default response type + response = None + + try: + r = requests.post( + notify_url, + params=params if params else None, + data=dumps(data) if data else None, + headers=headers, + files=files, + auth=auth, + verify=self.verify_certificate, + timeout=self.request_timeout, + ) + + if r.status_code != requests.codes.ok: + # We had a problem + status_str = \ + NotifyBase.http_response_code_lookup(r.status_code) + + # set up our status code to use + status_code = r.status_code + + try: + # Update our status response if we can + response = loads(r.content) + status_str = response.get('error', status_str) + status_code = \ + int(response.get('code', status_code)) + + except (AttributeError, TypeError, ValueError): + # ValueError = r.content is Unparsable + # TypeError = r.content is None + # AttributeError = r is None + + # We could not parse JSON response. + # We will just use the status we already have. + pass - except requests.RequestException as e: self.logger.warning( - 'A Connection error occurred sending ntfy:%s ' % ( - url) + 'notification.' - ) - self.logger.debug('Socket Exception: %s' % str(e)) + "Failed to send ntfy notification to topic '{}': " + '{}{}error={}.'.format( + topic, + status_str, + ', ' if status_str else '', + status_code)) - # Mark our failure - has_error = True + self.logger.debug( + 'Response Details:\r\n{}'.format(r.content)) - return not has_error + return False, response + + # otherwise we were successful + self.logger.info( + "Sent ntfy notification to '{}'.".format(notify_url)) + + return True, response + + except requests.RequestException as e: + self.logger.warning( + 'A Connection error occurred sending ntfy:%s ' % ( + notify_url) + 'notification.' + ) + self.logger.debug('Socket Exception: %s' % str(e)) + return False, response + + except (OSError, IOError) as e: + self.logger.warning( + 'An I/O error occurred while handling {}.'.format( + attach.name if isinstance(attach, AttachBase) + else virt_payload)) + self.logger.debug('I/O Exception: %s' % str(e)) + return False, response + + finally: + # Close our file (if it's open) stored in the second element + # of our files tuple (index 1) + if files: + files['file'][1].close() def url(self, privacy=False, *args, **kwargs): """ diff --git a/test/test_plugin_ntfy.py b/test/test_plugin_ntfy.py index c5ed53c8..34ab3024 100644 --- a/test/test_plugin_ntfy.py +++ b/test/test_plugin_ntfy.py @@ -22,17 +22,23 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. +import os import json import mock import requests +from apprise import Apprise from apprise import plugins from apprise import NotifyType from helpers import AppriseURLTester +from apprise import AppriseAttachment # Disable logging for a cleaner testing output import logging logging.disable(logging.CRITICAL) +# Attachment Directory +TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var') + # For testing our return response GOOD_RESPONSE_TEXT = { 'code': '0', @@ -66,7 +72,7 @@ apprise_url_tests = ( 'response': False, }), # No topics - ('ntfy://user:pass@localhost', { + ('ntfy://user:pass@localhost?mode=private', { 'instance': plugins.NotifyNtfy, # invalid topics specified (nothing to notify) # as a result the response type will be false @@ -197,7 +203,7 @@ apprise_url_tests = ( ('ntfys://user:web@-_/topic1/topic2/?mode=private', { 'instance': None, }), - ('ntfy://user:pass@localhost:8081/topic/topic2', { + ('ntfy://user:pass@localhost:8089/topic/topic2', { 'instance': plugins.NotifyNtfy, # force a failure using basic mode 'response': False, @@ -230,6 +236,121 @@ def test_plugin_ntfy_chat_urls(): AppriseURLTester(tests=apprise_url_tests).run_all() +@mock.patch('requests.post') +def test_plugin_ntfy_attachments(mock_post): + """ + NotifyNtfy() Attachment Checks + + """ + # Disable Throttling to speed testing + plugins.NotifyNtfy.request_rate_per_sec = 0 + + # Prepare Mock return object + response = mock.Mock() + response.content = GOOD_RESPONSE_TEXT + response.status_code = requests.codes.ok + mock_post.return_value = response + + # Test how the notifications work without attachments as they use the + # JSON type posting instead + + # Reset our mock object + mock_post.reset_mock() + + # Prepare our object + obj = Apprise.instantiate( + 'ntfy://user:pass@localhost:8080/topic') + + # Send a good attachment + assert obj.notify(title="hello", body="world") + assert mock_post.call_count == 1 + + assert mock_post.call_args_list[0][0][0] == \ + 'http://localhost:8080' + + response = json.loads(mock_post.call_args_list[0][1]['data']) + assert response['topic'] == 'topic' + assert response['title'] == 'hello' + assert response['message'] == 'world' + assert 'attach' not in response + + # Reset our mock object + mock_post.reset_mock() + + # prepare our attachment + attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif')) + + # Prepare our object + obj = Apprise.instantiate( + 'ntfy://user:pass@localhost:8084/topic') + + # Send a good attachment + assert obj.notify(body="test", attach=attach) is True + + # Test our call count; includes both image and message + assert mock_post.call_count == 1 + + assert mock_post.call_args_list[0][0][0] == \ + 'http://localhost:8084/topic' + + assert mock_post.call_args_list[0][1]['params']['message'] == 'test' + assert 'title' not in mock_post.call_args_list[0][1]['params'] + assert mock_post.call_args_list[0][1]['params']['filename'] == \ + 'apprise-test.gif' + + # Reset our mock object + mock_post.reset_mock() + + # Add another attachment so we drop into the area of the PushBullet code + # that sends remaining attachments (if more detected) + attach.add(os.path.join(TEST_VAR_DIR, 'apprise-test.png')) + + # Send our attachments + assert obj.notify(body="test", title="wonderful", attach=attach) is True + + # Test our call count + assert mock_post.call_count == 2 + # Image + Message sent + assert mock_post.call_args_list[0][0][0] == \ + 'http://localhost:8084/topic' + assert mock_post.call_args_list[0][1]['params']['message'] == \ + 'test' + assert mock_post.call_args_list[0][1]['params']['title'] == \ + 'wonderful' + assert mock_post.call_args_list[0][1]['params']['filename'] == \ + 'apprise-test.gif' + + # Image no 2 (no message) + assert mock_post.call_args_list[1][0][0] == \ + 'http://localhost:8084/topic' + assert 'message' not in mock_post.call_args_list[1][1]['params'] + assert 'title' not in mock_post.call_args_list[1][1]['params'] + assert mock_post.call_args_list[1][1]['params']['filename'] == \ + 'apprise-test.png' + + # Reset our mock object + mock_post.reset_mock() + + # An invalid attachment will cause a failure + path = os.path.join(TEST_VAR_DIR, '/invalid/path/to/an/invalid/file.jpg') + attach = AppriseAttachment(path) + assert obj.notify(body="test", attach=attach) is False + + # Test our call count + assert mock_post.call_count == 0 + + # prepare our attachment + attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif')) + + # Throw an exception on the first call to requests.post() + mock_post.return_value = None + for side_effect in (requests.RequestException(), OSError()): + mock_post.side_effect = side_effect + + # We'll fail now because of our error handling + assert obj.send(body="test", attach=attach) is False + + @mock.patch('requests.post') def test_plugin_custom_ntfy_edge_cases(mock_post): """ @@ -297,9 +418,11 @@ def test_plugin_custom_ntfy_edge_cases(mock_post): # Test our call count assert mock_post.call_count == 1 - assert mock_post.call_args_list[0][0][0] == \ - 'http://localhost/topic1' - assert mock_post.call_args_list[0][1]['headers'].get('X-Attach') == \ - 'http://example.com/file.jpg' - assert mock_post.call_args_list[0][1]['headers'].get('X-Filename') == \ - 'smoke.jpg' + assert mock_post.call_args_list[0][0][0] == 'http://localhost' + + response = json.loads(mock_post.call_args_list[0][1]['data']) + assert response['topic'] == 'topic1' + assert response['message'] == 'body' + assert response['title'] == 'title' + assert response['attach'] == 'http://example.com/file.jpg' + assert response['filename'] == 'smoke.jpg'