# -*- coding: utf-8 -*- # BSD 2-Clause License # # Apprise - Push Notification Library. # Copyright (c) 2024, Chris Caron # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # 1. Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # # 2. Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. import re from unittest import mock import requests import mimetypes from os.path import join from os.path import dirname from os.path import getsize from apprise.attachment.AttachHTTP import AttachHTTP from apprise import Apprise, AppriseAttachment from apprise.NotificationManager import NotificationManager from apprise.plugins.NotifyBase import NotifyBase from apprise.common import ContentLocation # Disable logging for a cleaner testing output import logging logging.disable(logging.CRITICAL) TEST_VAR_DIR = join(dirname(__file__), 'var') # Grant access to our Notification Manager Singleton N_MGR = NotificationManager() # Some exception handling we'll use REQUEST_EXCEPTIONS = ( requests.ConnectionError( 0, 'requests.ConnectionError() not handled'), requests.RequestException( 0, 'requests.RequestException() not handled'), requests.HTTPError( 0, 'requests.HTTPError() not handled'), requests.ReadTimeout( 0, 'requests.ReadTimeout() not handled'), requests.TooManyRedirects( 0, 'requests.TooManyRedirects() not handled'), # Throw OSError exceptions too OSError("SystemError") ) def test_attach_http_parse_url(): """ API: AttachHTTP().parse_url() """ # bad entry assert AttachHTTP.parse_url('garbage://') is None # no url specified assert AttachHTTP.parse_url('http://') is None def test_attach_http_query_string_dictionary(): """ API: AttachHTTP() Query String Dictionary """ # no qsd specified results = AttachHTTP.parse_url('http://localhost') assert isinstance(results, dict) # Create our object obj = AttachHTTP(**results) assert isinstance(obj, AttachHTTP) assert re.search(r'[?&]verify=yes', obj.url()) # Now lets create a URL with a custom Query String entry # some custom qsd entries specified results = AttachHTTP.parse_url('http://localhost?dl=1&_var=test') assert isinstance(results, dict) # Create our object obj = AttachHTTP(**results) assert isinstance(obj, AttachHTTP) assert re.search(r'[?&]verify=yes', obj.url()) # But now test that our custom arguments have also been set assert re.search(r'[?&]dl=1', obj.url()) assert re.search(r'[?&]_var=test', obj.url()) @mock.patch('requests.post') @mock.patch('requests.get') def test_attach_http(mock_get, mock_post): """ API: AttachHTTP() object """ # Define our good:// url class GoodNotification(NotifyBase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def notify(self, *args, **kwargs): # Pretend everything is okay return True def url(self): # Support url() function return '' # Store our good notification in our schema map N_MGR['good'] = GoodNotification # Temporary path path = join(TEST_VAR_DIR, 'apprise-test.gif') class DummyResponse: """ A dummy response used to manage our object """ status_code = requests.codes.ok headers = { 'Content-Length': getsize(path), 'Content-Type': 'image/gif', } # Pointer to file ptr = None # used to return random keep-alive chunks _keepalive_chunk_ref = 0 def close(self): return def iter_content(self, chunk_size=1024): """Lazy function (generator) to read a file piece by piece. Default chunk size: 1k.""" while True: self._keepalive_chunk_ref += 1 if 16 % self._keepalive_chunk_ref == 0: # Yield a keep-alive block yield '' data = self.ptr.read(chunk_size) if not data: break yield data def raise_for_status(self): return def __enter__(self): self.ptr = open(path, 'rb') return self def __exit__(self, *args, **kwargs): self.ptr.close() # Prepare Mock dummy_response = DummyResponse() mock_get.return_value = dummy_response # Test custom url get parameters results = AttachHTTP.parse_url( 'http://user:pass@localhost/apprise.gif?dl=1&cache=300') assert isinstance(results, dict) attachment = AttachHTTP(**results) assert isinstance(attachment.url(), str) is True # Test that our extended variables are passed along assert mock_get.call_count == 0 assert attachment assert mock_get.call_count == 1 assert 'params' in mock_get.call_args_list[0][1] assert 'dl' in mock_get.call_args_list[0][1]['params'] # Verify that arguments that are reserved for apprise are not # passed along assert 'cache' not in mock_get.call_args_list[0][1]['params'] results = AttachHTTP.parse_url( 'http://user:pass@localhost/apprise.gif?+key=value&cache=True') assert isinstance(results, dict) attachment = AttachHTTP(**results) assert isinstance(attachment.url(), str) is True # No mime-type and/or filename over-ride was specified, so therefore it # won't show up in the generated URL assert re.search(r'[?&]mime=', attachment.url()) is None assert re.search(r'[?&]name=', attachment.url()) is None # No Content-Disposition; so we use filename from path assert attachment.name == 'apprise.gif' assert attachment.mimetype == 'image/gif' results = AttachHTTP.parse_url( 'http://localhost:3000/noname.gif?name=usethis.jpg&mime=image/jpeg') assert isinstance(results, dict) attachment = AttachHTTP(**results) assert isinstance(attachment.url(), str) is True # both mime and name over-ridden assert re.search(r'[?&]mime=image%2Fjpeg', attachment.url()) assert re.search(r'[?&]name=usethis.jpg', attachment.url()) # No Content-Disposition; so we use filename from path assert attachment.name == 'usethis.jpg' assert attachment.mimetype == 'image/jpeg' # Edge case; download called a second time when content already retrieved assert attachment.download() assert attachment assert len(attachment) == getsize(path) # Test case where location is simply set to INACCESSIBLE # Below is a bad example, but it proves the section of code properly works. # Ideally a server admin may wish to just disable all HTTP based # attachments entirely. In this case, they simply just need to change the # global singleton at the start of their program like: # # import apprise # apprise.attachment.AttachHTTP.location = \ # apprise.ContentLocation.INACCESSIBLE attachment = AttachHTTP(**results) attachment.location = ContentLocation.INACCESSIBLE assert attachment.path is None # Downloads just don't work period assert attachment.download() is False # No path specified # No Content-Disposition specified # No filename (because no path) results = AttachHTTP.parse_url('http://localhost') assert isinstance(results, dict) attachment = AttachHTTP(**results) assert isinstance(attachment.url(), str) is True # No mime-type and/or filename over-ride was specified, so therefore it # won't show up in the generated URL assert re.search(r'[?&]mime=', attachment.url()) is None assert re.search(r'[?&]name=', attachment.url()) is None assert attachment.mimetype == 'image/gif' # Because we could determine our mime type, we could build an extension # for our unknown filename assert attachment.name == '{}{}'.format( AttachHTTP.unknown_filename, mimetypes.guess_extension(attachment.mimetype) ) assert attachment assert len(attachment) == getsize(path) # Set Content-Length to a value that exceeds our maximum allowable dummy_response.headers['Content-Length'] = AttachHTTP.max_file_size + 1 results = AttachHTTP.parse_url('http://localhost/toobig.jpg') assert isinstance(results, dict) attachment = AttachHTTP(**results) # we can not download this attachment assert not attachment assert isinstance(attachment.url(), str) is True # No mime-type and/or filename over-ride was specified, so therefore it # won't show up in the generated URL assert re.search(r'[?&]mime=', attachment.url()) is None assert re.search(r'[?&]name=', attachment.url()) is None assert attachment.mimetype is None assert attachment.name is None assert len(attachment) == 0 # Handle cases where we have no Content-Length and we need to rely # on what is read as it is streamed del dummy_response.headers['Content-Length'] # No path specified # No Content-Disposition specified # No Content-Length specified # No filename (because no path) results = AttachHTTP.parse_url('http://localhost/no-length.gif') assert isinstance(results, dict) attachment = AttachHTTP(**results) assert isinstance(attachment.url(), str) is True # No mime-type and/or filename over-ride was specified, so therefore it # won't show up in the generated URL assert re.search(r'[?&]mime=', attachment.url()) is None assert re.search(r'[?&]name=', attachment.url()) is None assert attachment.mimetype == 'image/gif' # Because we could determine our mime type, we could build an extension # for our unknown filename assert attachment.name == 'no-length.gif' assert attachment assert len(attachment) == getsize(path) # Set our limit to be the length of our image; everything should work # without a problem max_file_size = AttachHTTP.max_file_size AttachHTTP.max_file_size = getsize(path) # Set ourselves a Content-Disposition (providing a filename) dummy_response.headers['Content-Disposition'] = \ 'attachment; filename="myimage.gif"' # Remove our content type so we're forced to guess it from our filename # specified in our Content-Disposition del dummy_response.headers['Content-Type'] # No path specified # No Content-Length specified # Filename in Content-Disposition (over-rides one found in path results = AttachHTTP.parse_url('http://user@localhost/ignore-filename.gif') assert isinstance(results, dict) attachment = AttachHTTP(**results) assert isinstance(attachment.url(), str) is True # No mime-type and/or filename over-ride was specified, so therefore it # won't show up in the generated URL assert re.search(r'[?&]mime=', attachment.url()) is None assert re.search(r'[?&]name=', attachment.url()) is None assert attachment.mimetype == 'image/gif' # Because we could determine our mime type, we could build an extension # for our unknown filename assert attachment.name == 'myimage.gif' assert attachment assert len(attachment) == getsize(path) # Similar to test above except we make our max message size just 1 byte # smaller then our gif file. This will cause us to fail to read the # attachment AttachHTTP.max_file_size = getsize(path) - 1 results = AttachHTTP.parse_url('http://localhost/toobig.jpg') assert isinstance(results, dict) attachment = AttachHTTP(**results) # we can not download this attachment assert not attachment assert isinstance(attachment.url(), str) is True # No mime-type and/or filename over-ride was specified, so therefore it # won't show up in the generated URL assert re.search(r'[?&]mime=', attachment.url()) is None assert re.search(r'[?&]name=', attachment.url()) is None assert attachment.mimetype is None assert attachment.name is None assert len(attachment) == 0 # Disable our file size limitations AttachHTTP.max_file_size = 0 results = AttachHTTP.parse_url('http://user@localhost') assert isinstance(results, dict) attachment = AttachHTTP(**results) assert isinstance(attachment.url(), str) is True # No mime-type and/or filename over-ride was specified, so therefore it # won't show up in the generated URL assert re.search(r'[?&]mime=', attachment.url()) is None assert re.search(r'[?&]name=', attachment.url()) is None assert attachment.mimetype == 'image/gif' # Because we could determine our mime type, we could build an extension # for our unknown filename assert attachment.name == 'myimage.gif' assert attachment assert len(attachment) == getsize(path) # Set our header up with an invalid Content-Length; we can still process # this data. It just means we track it lower when reading back content dummy_response.headers = { 'Content-Length': 'invalid' } results = AttachHTTP.parse_url('http://localhost/invalid-length.gif') assert isinstance(results, dict) attachment = AttachHTTP(**results) assert isinstance(attachment.url(), str) is True # No mime-type and/or filename over-ride was specified, so therefore it # won't show up in the generated URL assert re.search(r'[?&]mime=', attachment.url()) is None assert re.search(r'[?&]name=', attachment.url()) is None assert attachment.mimetype == 'image/gif' # Because we could determine our mime type, we could build an extension # for our unknown filename assert attachment.name == 'invalid-length.gif' assert attachment # Give ourselves nothing to work with dummy_response.headers = {} results = AttachHTTP.parse_url('http://user@localhost') assert isinstance(results, dict) attachment = AttachHTTP(**results) # we can not download this attachment assert attachment assert isinstance(attachment.url(), str) is True # No mime-type and/or filename over-ride was specified, so therefore it # won't show up in the generated URL assert re.search(r'[?&]mime=', attachment.url()) is None assert re.search(r'[?&]name=', attachment.url()) is None # Handle edge-case where detected_name is None for whatever reason attachment.detected_name = None assert attachment.mimetype == attachment.unknown_mimetype assert attachment.name.startswith(AttachHTTP.unknown_filename) assert len(attachment) == getsize(path) # Exception handling mock_get.return_value = None for _exception in REQUEST_EXCEPTIONS: aa = AppriseAttachment.instantiate( 'http://localhost/exception.gif?cache=30') assert isinstance(aa, AttachHTTP) mock_get.side_effect = _exception assert not aa # Restore value AttachHTTP.max_file_size = max_file_size # Multi Message Testing mock_get.side_effect = None mock_get.return_value = DummyResponse() # Prepare our POST response (from notify call) response = requests.Request() response.status_code = requests.codes.ok response.content = "" mock_post.return_value = response mock_get.reset_mock() mock_post.reset_mock() assert mock_get.call_count == 0 apobj = Apprise() assert apobj.add('form://localhost') assert apobj.add('json://localhost') assert apobj.add('xml://localhost') assert len(apobj) == 3 assert apobj.notify( body='one attachment split 3 times', attach="http://localhost/test.gif", ) is True # We posted 3 times assert mock_post.call_count == 3 # We only fetched once and re-used the same fetch for all posts assert mock_get.call_count == 1 mock_get.reset_mock() mock_post.reset_mock() apobj = Apprise() for n in range(10): assert apobj.add(f'json://localhost?:entry={n}&method=post') assert apobj.add(f'form://localhost?:entry={n}&method=post') assert apobj.add(f'xml://localhost?:entry={n}&method=post') assert apobj.notify( body='one attachment split 30 times', attach="http://localhost/test.gif", ) is True # We posted 30 times assert mock_post.call_count == 30 # We only fetched once and re-used the same fetch for all posts assert mock_get.call_count == 1