diff --git a/apprise/Apprise.py b/apprise/Apprise.py index 19dde830..bb83a49e 100644 --- a/apprise/Apprise.py +++ b/apprise/Apprise.py @@ -31,8 +31,8 @@ # POSSIBILITY OF SUCH DAMAGE. import asyncio +import concurrent.futures as cf import os -from functools import partial from itertools import chain from . import common from .conversion import convert_between @@ -376,7 +376,7 @@ class Apprise: try: # Process arguments and build synchronous and asynchronous calls # (this step can throw internal errors). - sync_partials, async_cors = self._create_notify_calls( + sequential_calls, parallel_calls = self._create_notify_calls( body, title, notify_type=notify_type, body_format=body_format, tag=tag, match_always=match_always, attach=attach, @@ -387,49 +387,13 @@ class Apprise: # No notifications sent, and there was an internal error. return False - if not sync_partials and not async_cors: + if not sequential_calls and not parallel_calls: # Nothing to send return None - sync_result = Apprise._notify_all(*sync_partials) - - if async_cors: - # A single coroutine sends all asynchronous notifications in - # parallel. - all_cor = Apprise._async_notify_all(*async_cors) - - try: - # Python <3.7 automatically starts an event loop if there isn't - # already one for the main thread. - loop = asyncio.get_event_loop() - - except RuntimeError: - # Python >=3.7 raises this exception if there isn't already an - # event loop. So, we can spin up our own. - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.set_debug(self.debug) - - # Run the coroutine and wait for the result. - async_result = loop.run_until_complete(all_cor) - - # Clean up the loop. - loop.close() - asyncio.set_event_loop(None) - - else: - old_debug = loop.get_debug() - loop.set_debug(self.debug) - - # Run the coroutine and wait for the result. - async_result = loop.run_until_complete(all_cor) - - loop.set_debug(old_debug) - - else: - async_result = True - - return sync_result and async_result + sequential_result = Apprise._notify_sequential(*sequential_calls) + parallel_result = Apprise._notify_parallel_threadpool(*parallel_calls) + return sequential_result and parallel_result async def async_notify(self, *args, **kwargs): """ @@ -442,41 +406,42 @@ class Apprise: try: # Process arguments and build synchronous and asynchronous calls # (this step can throw internal errors). - sync_partials, async_cors = self._create_notify_calls( + sequential_calls, parallel_calls = self._create_notify_calls( *args, **kwargs) except TypeError: # No notifications sent, and there was an internal error. return False - if not sync_partials and not async_cors: + if not sequential_calls and not parallel_calls: # Nothing to send return None - sync_result = Apprise._notify_all(*sync_partials) - async_result = await Apprise._async_notify_all(*async_cors) - return sync_result and async_result + sequential_result = Apprise._notify_sequential(*sequential_calls) + parallel_result = \ + await Apprise._notify_parallel_asyncio(*parallel_calls) + return sequential_result and parallel_result def _create_notify_calls(self, *args, **kwargs): """ Creates notifications for all the plugins loaded. - Returns a list of synchronous calls (partial functions with no - arguments required) for plugins with async disabled and a list of - asynchronous calls (coroutines) for plugins with async enabled. + Returns a list of (server, notify() kwargs) tuples for plugins with + parallelism disabled and another list for plugins with parallelism + enabled. """ all_calls = list(self._create_notify_gen(*args, **kwargs)) - # Split into synchronous partials and asynchronous coroutines. - sync_partials, async_cors = [], [] - for notify in all_calls: - if asyncio.iscoroutine(notify): - async_cors.append(notify) + # Split into sequential and parallel notify() calls. + sequential, parallel = [], [] + for (server, notify_kwargs) in all_calls: + if server.asset.async_mode: + parallel.append((server, notify_kwargs)) else: - sync_partials.append(notify) + sequential.append((server, notify_kwargs)) - return sync_partials, async_cors + return sequential, parallel def _create_notify_gen(self, body, title='', notify_type=common.NotifyType.INFO, @@ -584,23 +549,20 @@ class Apprise: attach=attach, body_format=body_format ) - if server.asset.async_mode: - yield server.async_notify(**kwargs) - else: - yield partial(server.notify, **kwargs) + yield (server, kwargs) @staticmethod - def _notify_all(*partials): + def _notify_sequential(*servers_kwargs): """ - Process a list of synchronous notify() calls. + Process a list of notify() calls sequentially and synchronously. """ success = True - for notify in partials: + for (server, kwargs) in servers_kwargs: try: # Send notification - result = notify() + result = server.notify(**kwargs) success = success and result except TypeError: @@ -616,14 +578,59 @@ class Apprise: return success @staticmethod - async def _async_notify_all(*cors): + def _notify_parallel_threadpool(*servers_kwargs): """ - Process a list of asynchronous async_notify() calls. + Process a list of notify() calls in parallel and synchronously. """ + # 0-length case + if not servers_kwargs: + return True + # Create log entry - logger.info('Notifying %d service(s) asynchronously.', len(cors)) + logger.info( + 'Notifying %d service(s) with threads.', len(servers_kwargs)) + with cf.ThreadPoolExecutor() as executor: + success = True + futures = [executor.submit(server.notify, **kwargs) + for (server, kwargs) in servers_kwargs] + + for future in cf.as_completed(futures): + try: + result = future.result() + success = success and result + + except TypeError: + # These are our internally thrown notifications. + success = False + + except Exception: + # A catch all so we don't have to abort early + # just because one of our plugins has a bug in it. + logger.exception("Unhandled Notification Exception") + success = False + + return success + + @staticmethod + async def _notify_parallel_asyncio(*servers_kwargs): + """ + Process a list of async_notify() calls in parallel and asynchronously. + """ + + # 0-length case + if not servers_kwargs: + return True + + # Create log entry + logger.info( + 'Notifying %d service(s) asynchronously.', len(servers_kwargs)) + + async def do_call(server, kwargs): + return await server.async_notify(**kwargs) + + cors = (do_call(server, kwargs) for (server, kwargs) in servers_kwargs) results = await asyncio.gather(*cors, return_exceptions=True) if any(isinstance(status, Exception) diff --git a/test/test_api.py b/test/test_api.py index d97cae56..4ea28365 100644 --- a/test/test_api.py +++ b/test/test_api.py @@ -32,6 +32,7 @@ from __future__ import print_function import asyncio +import concurrent.futures import re import sys import pytest @@ -1781,7 +1782,9 @@ def test_apprise_details_plugin_verification(): @mock.patch('requests.post') @mock.patch('asyncio.gather', wraps=asyncio.gather) -def test_apprise_async_mode(mock_gather, mock_post, tmpdir): +@mock.patch('concurrent.futures.ThreadPoolExecutor', + wraps=concurrent.futures.ThreadPoolExecutor) +def test_apprise_async_mode(mock_threadpool, mock_gather, mock_post, tmpdir): """ API: Apprise() async_mode tests @@ -1814,9 +1817,9 @@ def test_apprise_async_mode(mock_gather, mock_post, tmpdir): # Send Notifications Asyncronously assert a.notify("async") is True - # Verify our async code got executed - assert mock_gather.call_count > 0 - mock_gather.reset_mock() + # Verify our thread pool was created + assert mock_threadpool.call_count == 1 + mock_threadpool.reset_mock() # Provide an over-ride now asset = AppriseAsset(async_mode=False) @@ -1863,9 +1866,9 @@ def test_apprise_async_mode(mock_gather, mock_post, tmpdir): # Send 1 Notification Syncronously, the other Asyncronously assert a.notify("a mixed batch") is True - # Verify our async code got called - assert mock_gather.call_count > 0 - mock_gather.reset_mock() + # Verify our thread pool was created + assert mock_threadpool.call_count == 1 + mock_threadpool.reset_mock() def test_notify_matrix_dynamic_importing(tmpdir): diff --git a/test/test_apprise_config.py b/test/test_apprise_config.py index 7dd9b6ff..e920866e 100644 --- a/test/test_apprise_config.py +++ b/test/test_apprise_config.py @@ -561,7 +561,7 @@ def test_apprise_config_with_apprise_obj(tmpdir): super().__init__( notify_format=NotifyFormat.HTML, **kwargs) - async def async_notify(self, **kwargs): + def notify(self, **kwargs): # Pretend everything is okay return True