Refactored async notification handling (#741)

This commit is contained in:
Ryan Young 2023-02-15 19:50:10 -08:00 committed by GitHub
parent d395d89a3b
commit c2fdd47b9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 308 additions and 330 deletions

View File

@ -30,7 +30,9 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE. # POSSIBILITY OF SUCH DAMAGE.
import asyncio
import os import os
from functools import partial
from itertools import chain from itertools import chain
from . import common from . import common
from .conversion import convert_between from .conversion import convert_between
@ -50,11 +52,6 @@ from .plugins.NotifyBase import NotifyBase
from . import plugins from . import plugins
from . import __version__ from . import __version__
# Python v3+ support code made importable, so it can remain backwards
# compatible with Python v2
# TODO: Review after dropping support for Python 2.
from . import py3compat
class Apprise: class Apprise:
""" """
@ -376,91 +373,118 @@ class Apprise:
such as turning a \n into an actual new line, etc. such as turning a \n into an actual new line, etc.
""" """
return py3compat.asyncio.tosync( try:
self.async_notify( # Process arguments and build synchronous and asynchronous calls
# (this step can throw internal errors).
sync_partials, async_cors = self._create_notify_calls(
body, title, body, title,
notify_type=notify_type, body_format=body_format, notify_type=notify_type, body_format=body_format,
tag=tag, match_always=match_always, attach=attach, tag=tag, match_always=match_always, attach=attach,
interpret_escapes=interpret_escapes, interpret_escapes=interpret_escapes
), )
debug=self.debug
)
def async_notify(self, *args, **kwargs): except TypeError:
# No notifications sent, and there was an internal error.
return False
if not sync_partials and not async_cors:
# Nothing to send
return None
sync_result = Apprise._notify_all(*sync_partials)
if async_cors:
# A single coroutine sends all asynchronous notifications in
# parallel.
all_cor = Apprise._async_notify_all(*async_cors)
try:
# Python <3.7 automatically starts an event loop if there isn't
# already one for the main thread.
loop = asyncio.get_event_loop()
except RuntimeError:
# Python >=3.7 raises this exception if there isn't already an
# event loop. So, we can spin up our own.
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_debug(self.debug)
# Run the coroutine and wait for the result.
async_result = loop.run_until_complete(all_cor)
# Clean up the loop.
loop.close()
asyncio.set_event_loop(None)
else:
old_debug = loop.get_debug()
loop.set_debug(self.debug)
# Run the coroutine and wait for the result.
async_result = loop.run_until_complete(all_cor)
loop.set_debug(old_debug)
else:
async_result = True
return sync_result and async_result
async def async_notify(self, *args, **kwargs):
""" """
Send a notification to all the plugins previously loaded, for Send a notification to all the plugins previously loaded, for
asynchronous callers. This method is an async method that should be asynchronous callers.
awaited on, even if it is missing the async keyword in its signature.
(This is omitted to preserve syntax compatibility with Python 2.)
The arguments are identical to those of Apprise.notify(). The arguments are identical to those of Apprise.notify().
""" """
try: try:
coroutines = list( # Process arguments and build synchronous and asynchronous calls
self._notifyall( # (this step can throw internal errors).
Apprise._notifyhandlerasync, *args, **kwargs)) sync_partials, async_cors = self._create_notify_calls(
*args, **kwargs)
except TypeError: except TypeError:
# No notifications sent, and there was an internal error. # No notifications sent, and there was an internal error.
return py3compat.asyncio.toasyncwrapvalue(False)
else:
if len(coroutines) > 0:
# All notifications sent, return False if any failed.
return py3compat.asyncio.notify(coroutines)
else:
# No notifications sent.
return py3compat.asyncio.toasyncwrapvalue(None)
@staticmethod
def _notifyhandler(server, **kwargs):
"""
The synchronous notification sender. Returns True if the notification
sent successfully.
"""
try:
# Send notification
return server.notify(**kwargs)
except TypeError:
# These our our internally thrown notifications
return False return False
except Exception: if not sync_partials and not async_cors:
# A catch all so we don't have to abort early # Nothing to send
# just because one of our plugins has a bug in it. return None
logger.exception("Unhandled Notification Exception")
return False
@staticmethod sync_result = Apprise._notify_all(*sync_partials)
def _notifyhandlerasync(server, **kwargs): async_result = await Apprise._async_notify_all(*async_cors)
""" return sync_result and async_result
The asynchronous notification sender. Returns a coroutine that yields
True if the notification sent successfully.
"""
if server.asset.async_mode: def _create_notify_calls(self, *args, **kwargs):
return server.async_notify(**kwargs)
else:
# Send the notification immediately, and wrap the result in a
# coroutine.
status = Apprise._notifyhandler(server, **kwargs)
return py3compat.asyncio.toasyncwrapvalue(status)
def _notifyall(self, handler, body, title='',
notify_type=common.NotifyType.INFO, body_format=None,
tag=common.MATCH_ALL_TAG, match_always=True, attach=None,
interpret_escapes=None):
""" """
Creates notifications for all the plugins loaded. Creates notifications for all the plugins loaded.
Returns a generator that calls handler for each notification. The first Returns a list of synchronous calls (partial functions with no
and only argument supplied to handler is the server, and the keyword arguments required) for plugins with async disabled and a list of
arguments are exactly as they would be passed to server.notify(). asynchronous calls (coroutines) for plugins with async enabled.
"""
all_calls = list(self._create_notify_gen(*args, **kwargs))
# Split into synchronous partials and asynchronous coroutines.
sync_partials, async_cors = [], []
for notify in all_calls:
if asyncio.iscoroutine(notify):
async_cors.append(notify)
else:
sync_partials.append(notify)
return sync_partials, async_cors
def _create_notify_gen(self, body, title='',
notify_type=common.NotifyType.INFO,
body_format=None, tag=common.MATCH_ALL_TAG,
match_always=True, attach=None,
interpret_escapes=None):
"""
Internal generator function for _create_notify_calls().
""" """
if len(self) == 0: if len(self) == 0:
@ -553,14 +577,67 @@ class Apprise:
logger.error(msg) logger.error(msg)
raise TypeError(msg) raise TypeError(msg)
yield handler( kwargs = dict(
server,
body=conversion_body_map[server.notify_format], body=conversion_body_map[server.notify_format],
title=conversion_title_map[server.notify_format], title=conversion_title_map[server.notify_format],
notify_type=notify_type, notify_type=notify_type,
attach=attach, attach=attach,
body_format=body_format, body_format=body_format
) )
if server.asset.async_mode:
yield server.async_notify(**kwargs)
else:
yield partial(server.notify, **kwargs)
@staticmethod
def _notify_all(*partials):
"""
Process a list of synchronous notify() calls.
"""
success = True
for notify in partials:
try:
# Send notification
result = notify()
success = success and result
except TypeError:
# These are our internally thrown notifications.
success = False
except Exception:
# A catch all so we don't have to abort early
# just because one of our plugins has a bug in it.
logger.exception("Unhandled Notification Exception")
success = False
return success
@staticmethod
async def _async_notify_all(*cors):
"""
Process a list of asynchronous async_notify() calls.
"""
# Create log entry
logger.info('Notifying %d service(s) asynchronously.', len(cors))
results = await asyncio.gather(*cors, return_exceptions=True)
if any(isinstance(status, Exception)
and not isinstance(status, TypeError) for status in results):
# A catch all so we don't have to abort early just because
# one of our plugins has a bug in it.
logger.exception("Unhandled Notification Exception")
return False
if any(isinstance(status, TypeError) for status in results):
# These are our internally thrown notifications.
return False
return all(results)
def details(self, lang=None, show_requirements=False, show_disabled=False): def details(self, lang=None, show_requirements=False, show_disabled=False):
""" """

View File

@ -30,7 +30,9 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE. # POSSIBILITY OF SUCH DAMAGE.
import asyncio
import re import re
from functools import partial
from ..URLBase import URLBase from ..URLBase import URLBase
from ..common import NotifyType from ..common import NotifyType
@ -43,12 +45,7 @@ from ..AppriseLocale import gettext_lazy as _
from ..AppriseAttachment import AppriseAttachment from ..AppriseAttachment import AppriseAttachment
# Wrap our base with the asyncio wrapper class NotifyBase(URLBase):
from ..py3compat.asyncio import AsyncNotifyBase
BASE_OBJECT = AsyncNotifyBase
class NotifyBase(BASE_OBJECT):
""" """
This is the base class for all notification services This is the base class for all notification services
""" """
@ -274,19 +271,64 @@ class NotifyBase(BASE_OBJECT):
color_type=color_type, color_type=color_type,
) )
def notify(self, body, title=None, notify_type=NotifyType.INFO, def notify(self, *args, **kwargs):
overflow=None, attach=None, body_format=None, **kwargs):
""" """
Performs notification Performs notification
"""
try:
# Build a list of dictionaries that can be used to call send().
send_calls = list(self._build_send_calls(*args, **kwargs))
except TypeError:
# Internal error
return False
else:
# Loop through each call, one at a time. (Use a list rather than a
# generator to call all the partials, even in case of a failure.)
the_calls = [self.send(**kwargs2) for kwargs2 in send_calls]
return all(the_calls)
async def async_notify(self, *args, **kwargs):
"""
Performs notification for asynchronous callers
"""
try:
# Build a list of dictionaries that can be used to call send().
send_calls = list(self._build_send_calls(*args, **kwargs))
except TypeError:
# Internal error
return False
else:
loop = asyncio.get_event_loop()
# Wrap each call in a coroutine that uses the default executor.
# TODO: In the future, allow plugins to supply a native
# async_send() method.
async def do_send(**kwargs2):
send = partial(self.send, **kwargs2)
result = await loop.run_in_executor(None, send)
return result
# gather() all calls in parallel.
the_cors = (do_send(**kwargs2) for kwargs2 in send_calls)
return all(await asyncio.gather(*the_cors))
def _build_send_calls(self, body, title=None,
notify_type=NotifyType.INFO, overflow=None,
attach=None, body_format=None, **kwargs):
"""
Get a list of dictionaries that can be used to call send() or
(in the future) async_send().
""" """
if not self.enabled: if not self.enabled:
# Deny notifications issued to services that are disabled # Deny notifications issued to services that are disabled
self.logger.warning( msg = f"{self.service_name} is currently disabled on this system."
"{} is currently disabled on this system.".format( self.logger.warning(msg)
self.service_name)) raise TypeError(msg)
return False
# Prepare attachments if required # Prepare attachments if required
if attach is not None and not isinstance(attach, AppriseAttachment): if attach is not None and not isinstance(attach, AppriseAttachment):
@ -295,7 +337,7 @@ class NotifyBase(BASE_OBJECT):
except TypeError: except TypeError:
# bad attachments # bad attachments
return False raise
# Handle situations where the title is None # Handle situations where the title is None
title = '' if not title else title title = '' if not title else title
@ -306,14 +348,11 @@ class NotifyBase(BASE_OBJECT):
body_format=body_format): body_format=body_format):
# Send notification # Send notification
if not self.send(body=chunk['body'], title=chunk['title'], yield dict(
notify_type=notify_type, attach=attach, body=chunk['body'], title=chunk['title'],
body_format=body_format): notify_type=notify_type, attach=attach,
body_format=body_format
# Toggle our return status flag )
return False
return True
def _apply_overflow(self, body, title=None, overflow=None, def _apply_overflow(self, body, title=None, overflow=None,
body_format=None): body_format=None):

View File

@ -1,147 +0,0 @@
# -*- coding: utf-8 -*-
# BSD 3-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2023, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import sys
import asyncio
from functools import partial
from ..URLBase import URLBase
from ..logger import logger
# A global flag that tracks if we are Python v3.7 or higher
ASYNCIO_RUN_SUPPORT = \
sys.version_info.major > 3 or \
(sys.version_info.major == 3 and sys.version_info.minor >= 7)
async def notify(coroutines):
"""
An async wrapper to the AsyncNotifyBase.async_notify() calls allowing us
to call gather() and collect the responses
"""
# Create log entry
logger.info(
'Notifying {} service(s) asynchronously.'.format(len(coroutines)))
results = await asyncio.gather(*coroutines, return_exceptions=True)
# Returns True if all notifications succeeded, otherwise False is
# returned.
failed = any(not status or isinstance(status, Exception)
for status in results)
return not failed
def tosync(cor, debug=False):
"""
Await a coroutine from non-async code.
"""
if ASYNCIO_RUN_SUPPORT:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# There is no existing event loop, so we can start our own.
return asyncio.run(cor, debug=debug)
else:
# Enable debug mode
loop.set_debug(debug)
# Run the coroutine and wait for the result.
task = loop.create_task(cor)
return asyncio.ensure_future(task, loop=loop)
else:
# The Deprecated Way (<= Python v3.6)
try:
# acquire access to our event loop
loop = asyncio.get_event_loop()
except RuntimeError:
# This happens if we're inside a thread of another application
# where there is no running event_loop(). Pythong v3.7 and
# higher automatically take care of this case for us. But for
# the lower versions we need to do the following:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Enable debug mode
loop.set_debug(debug)
return loop.run_until_complete(cor)
async def toasyncwrapvalue(v):
"""
Create a coroutine that, when run, returns the provided value.
"""
return v
async def toasyncwrap(fn):
"""
Create a coroutine that, when run, executes the provided function.
"""
return fn()
class AsyncNotifyBase(URLBase):
"""
asyncio wrapper for the NotifyBase object
"""
async def async_notify(self, *args, **kwargs):
"""
Async Notification Wrapper
"""
loop = asyncio.get_event_loop()
try:
return await loop.run_in_executor(
None, partial(self.notify, *args, **kwargs))
except TypeError:
# These are our internally thrown notifications
pass
except Exception:
# A catch-all so we don't have to abort early
# just because one of our plugins has a bug in it.
logger.exception("Notification Exception")
return False

View File

@ -31,9 +31,11 @@
# POSSIBILITY OF SUCH DAMAGE. # POSSIBILITY OF SUCH DAMAGE.
from .rest import AppriseURLTester from .rest import AppriseURLTester
from .asyncio import OuterEventLoop
from .module import reload_plugin from .module import reload_plugin
__all__ = [ __all__ = [
'AppriseURLTester', 'AppriseURLTester',
'OuterEventLoop',
'reload_plugin', 'reload_plugin',
] ]

45
test/helpers/asyncio.py Normal file
View File

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import asyncio
class OuterEventLoop():
"""
An event loop that is easy to put up and tear down from synchronous test
code.
"""
def __init__(self):
self._loop = None
def __enter__(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._loop = loop
return loop
def __exit__(self, type, value, traceback):
asyncio.set_event_loop(None)
self._loop.close()

View File

@ -31,6 +31,7 @@
# POSSIBILITY OF SUCH DAMAGE. # POSSIBILITY OF SUCH DAMAGE.
from __future__ import print_function from __future__ import print_function
import asyncio
import re import re
import sys import sys
import pytest import pytest
@ -56,12 +57,9 @@ from apprise import common
from apprise.plugins import __load_matrix from apprise.plugins import __load_matrix
from apprise.plugins import __reset_matrix from apprise.plugins import __reset_matrix
from apprise.utils import parse_list from apprise.utils import parse_list
from helpers import OuterEventLoop
import inspect import inspect
# Sending notifications requires the coroutines to be awaited, so we need to
# wrap the original function when mocking it.
import apprise.py3compat.asyncio as py3aio
# Disable logging for a cleaner testing output # Disable logging for a cleaner testing output
import logging import logging
logging.disable(logging.CRITICAL) logging.disable(logging.CRITICAL)
@ -86,10 +84,12 @@ def test_apprise_async():
API: Apprise() object asynchronous methods API: Apprise() object asynchronous methods
""" """
def do_notify(server, *args, **kwargs): with OuterEventLoop() as loop:
return py3aio.tosync(server.async_notify(*args, **kwargs)) def do_notify(server, *args, **kwargs):
return loop.run_until_complete(
server.async_notify(*args, **kwargs))
apprise_test(do_notify) apprise_test(do_notify)
def apprise_test(do_notify): def apprise_test(do_notify):
@ -299,6 +299,10 @@ def apprise_test(do_notify):
# Pretend everything is okay # Pretend everything is okay
raise TypeError() raise TypeError()
async def async_notify(self, **kwargs):
# Pretend everything is okay (async)
raise TypeError()
def url(self, **kwargs): def url(self, **kwargs):
# Support URL # Support URL
return '' return ''
@ -308,6 +312,10 @@ def apprise_test(do_notify):
# Pretend everything is okay # Pretend everything is okay
raise RuntimeError() raise RuntimeError()
async def async_notify(self, **kwargs):
# Pretend everything is okay (async)
raise TypeError()
def url(self, **kwargs): def url(self, **kwargs):
# Support URL # Support URL
return '' return ''
@ -318,6 +326,10 @@ def apprise_test(do_notify):
# Pretend everything is okay # Pretend everything is okay
return False return False
async def async_notify(self, **kwargs):
# Pretend everything is okay (async)
raise TypeError()
def url(self, **kwargs): def url(self, **kwargs):
# Support URL # Support URL
return '' return ''
@ -546,10 +558,12 @@ def test_apprise_tagging_async(mock_post, mock_get):
API: Apprise() object tagging functionality asynchronous methods API: Apprise() object tagging functionality asynchronous methods
""" """
def do_notify(server, *args, **kwargs): with OuterEventLoop() as loop:
return py3aio.tosync(server.async_notify(*args, **kwargs)) def do_notify(server, *args, **kwargs):
return loop.run_until_complete(
server.async_notify(*args, **kwargs))
apprise_tagging_test(mock_post, mock_get, do_notify) apprise_tagging_test(mock_post, mock_get, do_notify)
def apprise_tagging_test(mock_post, mock_get, do_notify): def apprise_tagging_test(mock_post, mock_get, do_notify):
@ -733,7 +747,10 @@ def test_apprise_notify_formats(tmpdir):
# other if/else parts of the code that aren't otherwise called # other if/else parts of the code that aren't otherwise called
__load_matrix() __load_matrix()
a = Apprise() # Need to set async_mode=False to call notify() instead of async_notify().
asset = AppriseAsset(async_mode=False)
a = Apprise(asset=asset)
# no items # no items
assert len(a) == 0 assert len(a) == 0
@ -743,7 +760,7 @@ def test_apprise_notify_formats(tmpdir):
notify_format = NotifyFormat.TEXT notify_format = NotifyFormat.TEXT
def __init__(self, **kwargs): def __init__(self, **kwargs):
super().__init__() super().__init__(**kwargs)
def notify(self, **kwargs): def notify(self, **kwargs):
# Pretend everything is okay # Pretend everything is okay
@ -759,7 +776,7 @@ def test_apprise_notify_formats(tmpdir):
notify_format = NotifyFormat.HTML notify_format = NotifyFormat.HTML
def __init__(self, **kwargs): def __init__(self, **kwargs):
super().__init__() super().__init__(**kwargs)
def notify(self, **kwargs): def notify(self, **kwargs):
# Pretend everything is okay # Pretend everything is okay
@ -775,7 +792,7 @@ def test_apprise_notify_formats(tmpdir):
notify_format = NotifyFormat.MARKDOWN notify_format = NotifyFormat.MARKDOWN
def __init__(self, **kwargs): def __init__(self, **kwargs):
super().__init__() super().__init__(**kwargs)
def notify(self, **kwargs): def notify(self, **kwargs):
# Pretend everything is okay # Pretend everything is okay
@ -1763,8 +1780,8 @@ def test_apprise_details_plugin_verification():
@mock.patch('requests.post') @mock.patch('requests.post')
@mock.patch('apprise.py3compat.asyncio.notify', wraps=py3aio.notify) @mock.patch('asyncio.gather', wraps=asyncio.gather)
def test_apprise_async_mode(mock_async_notify, mock_post, tmpdir): def test_apprise_async_mode(mock_gather, mock_post, tmpdir):
""" """
API: Apprise() async_mode tests API: Apprise() async_mode tests
@ -1798,8 +1815,8 @@ def test_apprise_async_mode(mock_async_notify, mock_post, tmpdir):
assert a.notify("async") is True assert a.notify("async") is True
# Verify our async code got executed # Verify our async code got executed
assert mock_async_notify.call_count == 1 assert mock_gather.call_count > 0
mock_async_notify.reset_mock() mock_gather.reset_mock()
# Provide an over-ride now # Provide an over-ride now
asset = AppriseAsset(async_mode=False) asset = AppriseAsset(async_mode=False)
@ -1823,9 +1840,9 @@ def test_apprise_async_mode(mock_async_notify, mock_post, tmpdir):
# Send Notifications Syncronously # Send Notifications Syncronously
assert a.notify("sync") is True assert a.notify("sync") is True
# Verify our async code got called # Sequential send doesn't require a gather
assert mock_async_notify.call_count == 1 assert mock_gather.call_count == 0
mock_async_notify.reset_mock() mock_gather.reset_mock()
# another way of looking a our false set asset configuration # another way of looking a our false set asset configuration
assert a[0].asset.async_mode is False assert a[0].asset.async_mode is False
@ -1847,8 +1864,8 @@ def test_apprise_async_mode(mock_async_notify, mock_post, tmpdir):
assert a.notify("a mixed batch") is True assert a.notify("a mixed batch") is True
# Verify our async code got called # Verify our async code got called
assert mock_async_notify.call_count == 1 assert mock_gather.call_count > 0
mock_async_notify.reset_mock() mock_gather.reset_mock()
def test_notify_matrix_dynamic_importing(tmpdir): def test_notify_matrix_dynamic_importing(tmpdir):

View File

@ -561,7 +561,7 @@ def test_apprise_config_with_apprise_obj(tmpdir):
super().__init__( super().__init__(
notify_format=NotifyFormat.HTML, **kwargs) notify_format=NotifyFormat.HTML, **kwargs)
def notify(self, **kwargs): async def async_notify(self, **kwargs):
# Pretend everything is okay # Pretend everything is okay
return True return True

View File

@ -39,8 +39,6 @@ from apprise import NotifyFormat
from apprise.common import NOTIFY_SCHEMA_MAP from apprise.common import NOTIFY_SCHEMA_MAP
import apprise.py3compat.asyncio as py3aio
# Disable logging for a cleaner testing output # Disable logging for a cleaner testing output
import logging import logging
logging.disable(logging.CRITICAL) logging.disable(logging.CRITICAL)
@ -87,7 +85,11 @@ def test_apprise_asyncio_runtime_error():
import asyncio import asyncio
# Get our event loop # Get our event loop
loop = asyncio.get_event_loop() try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = None
# Adjust out event loop to not point at anything # Adjust out event loop to not point at anything
asyncio.set_event_loop(None) asyncio.set_event_loop(None)
@ -102,67 +104,6 @@ def test_apprise_asyncio_runtime_error():
# enough to create a new event loop and continue... # enough to create a new event loop and continue...
assert a.notify(title="title", body="body") is True assert a.notify(title="title", body="body") is True
# Verify we have an active event loop
new_loop = asyncio.get_event_loop()
# We didn't throw an exception above; thus we have an event loop at
# this point
assert new_loop
# Close off the internal loop created inside a.notify()
new_loop.close()
finally: finally:
# Restore our event loop (in the event the above test failed) # Restore our event loop (in the event the above test failed)
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
@pytest.mark.skipif(sys.version_info < (3, 7),
reason="Requires Python 3.7+")
def test_apprise_works_in_async_loop():
"""
API: Apprise() can execute synchronously in an existing event loop
"""
class GoodNotification(NotifyBase):
def __init__(self, **kwargs):
super().__init__(
notify_format=NotifyFormat.HTML, **kwargs)
def url(self, **kwargs):
# Support URL
return ''
def send(self, **kwargs):
# Pretend everything is okay
return True
@staticmethod
def parse_url(url, *args, **kwargs):
# always parseable
return NotifyBase.parse_url(url, verify_host=False)
# Store our good notification in our schema map
NOTIFY_SCHEMA_MAP['good'] = GoodNotification
# Create ourselves an Apprise object
a = Apprise()
# Add a few entries
for _ in range(25):
a.add('good://')
# To ensure backwards compatibility, it should be possible to call
# asynchronous Apprise methods from code that already uses an event loop,
# even when using the synchronous notify() method.
# see https://github.com/caronc/apprise/issues/610
import asyncio
def try_notify():
a.notify(title="title", body="body")
# Convert to a coroutine to run asynchronously.
cor = py3aio.toasyncwrap(try_notify)
# Should execute successfully.
asyncio.run(cor)

View File

@ -68,6 +68,10 @@ def test_apprise_cli_nux_env(tmpdir):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def notify(self, **kwargs): def notify(self, **kwargs):
# Pretend everything is okay (when passing --disable-async)
return True
async def async_notify(self, **kwargs):
# Pretend everything is okay # Pretend everything is okay
return True return True
@ -79,8 +83,8 @@ def test_apprise_cli_nux_env(tmpdir):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def notify(self, **kwargs): async def async_notify(self, **kwargs):
# Force a notification failure # Pretend everything is okay
return False return False
def url(self, *args, **kwargs): def url(self, *args, **kwargs):