Python asyncio Integration - Notifications sent Asynchronously (#273)

This commit is contained in:
Chris Caron 2020-08-15 20:28:15 -04:00 committed by GitHub
parent 4f4b15d47e
commit aa039e74b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 259 additions and 13 deletions

View File

@ -10,11 +10,13 @@ To inform or tell (someone). To make one aware of something.
* One notification library to rule them all.
* A common and intuitive notification syntax.
* Supports the handling of images and attachments (to the notification services that will accept them).
* Supports the handling of images and attachments (_to the notification services that will accept them_).
* It's incredibly lightweight.
* Amazing response times because all messages sent asyncronously.
System owners who wish to provide a notification service no longer need to research each and every new one as they appear. They just need to include this one library and then they can immediately gain access to almost all of the notifications services available to us today.
Developers who wish to provide a notification service no longer need to research each and every one out there. They no longer need to try to adapt to the new ones that comeout thereafter. They just need to include this one library and then they can immediately gain access to almost all of the notifications services available to us today.
System Administrators who wish to send a notification from a scheduled task or from the command line also no longer need to find the right tool for the job. Everything is already wrapped and supported within the *apprise* script that ships with this product.
System Administrators and DevOps who wish to send a notification now no longer need to find the right tool for the job. Everything is already wrapped and supported within the `apprise` command line tool (CLI) that ships with this product.
[![Paypal](https://img.shields.io/badge/paypal-donate-green.svg)](https://www.paypal.com/cgi-bin/webscr?cmd=_s-xclick&hosted_button_id=MHANV39UZNQ5E)
[![Follow](https://img.shields.io/twitter/follow/l2gnux)](https://twitter.com/l2gnux/)<br/>

View File

@ -46,13 +46,19 @@ from .plugins.NotifyBase import NotifyBase
from . import plugins
from . import __version__
# Python v3+ support code made importable so it can remain backwards
# compatible with Python v2
from . import py3compat
ASYNCIO_SUPPORT = not six.PY2
class Apprise(object):
"""
Our Notification Manager
"""
def __init__(self, servers=None, asset=None):
def __init__(self, servers=None, asset=None, debug=False):
"""
Loads a set of server urls while applying the Asset() module to each
if specified.
@ -78,6 +84,9 @@ class Apprise(object):
# Initialize our locale object
self.locale = AppriseLocale()
# Set our debug flag
self.debug = debug
@staticmethod
def instantiate(url, asset=None, tag=None, suppress_exceptions=True):
"""
@ -326,6 +335,10 @@ class Apprise(object):
body_format = self.asset.body_format \
if body_format is None else body_format
# for asyncio support; we track a list of our servers to notify
# sequentially
coroutines = []
# Iterate over our loaded plugins
for server in self.find(tag):
if status is None:
@ -383,6 +396,18 @@ class Apprise(object):
# Store entry directly
conversion_map[server.notify_format] = body
if ASYNCIO_SUPPORT and server.asset.async_mode:
# Build a list of servers requiring notification
# that will be triggered asynchronously afterwards
coroutines.append(server.async_notify(
body=conversion_map[server.notify_format],
title=title,
notify_type=notify_type,
attach=attach))
# We gather at this point and notify at the end
continue
try:
# Send notification
if not server.notify(
@ -404,6 +429,12 @@ class Apprise(object):
logger.exception("Notification Exception")
status = False
if coroutines:
# perform our async notification(s)
if not py3compat.asyncio.notify(coroutines, debug=self.debug):
# Toggle our status only if we had a failure
status = False
return status
def details(self, lang=None):

View File

@ -99,6 +99,12 @@ class AppriseAsset(object):
# will be the default.
body_format = None
# Always attempt to send notifications asynchronous (as the same time
# if possible)
# This is a Python 3 supported option only. If set to False, then
# notifications are sent sequentially (one after another)
async_mode = True
def __init__(self, **kwargs):
"""
Asset Initialization

View File

@ -123,6 +123,8 @@ def print_version_msg():
'which services to notify. Use multiple --tag (-g) entries to '
'"OR" the tags together and comma separated to "AND" them. '
'If no tags are specified then all services are notified.')
@click.option('--disable-async', '-Da', is_flag=True,
help='Send all notifications sequentially')
@click.option('--dry-run', '-d', is_flag=True,
help='Perform a trial run but only prints the notification '
'services to-be triggered to stdout. Notifications are never '
@ -130,12 +132,13 @@ def print_version_msg():
@click.option('--verbose', '-v', count=True,
help='Makes the operation more talkative. Use multiple v to '
'increase the verbosity. I.e.: -vvvv')
@click.option('--debug', '-D', is_flag=True, help='Debug mode')
@click.option('--version', '-V', is_flag=True,
help='Display the apprise version and exit.')
@click.argument('urls', nargs=-1,
metavar='SERVER_URL [SERVER_URL2 [SERVER_URL3]]',)
def main(body, title, config, attach, urls, notification_type, theme, tag,
input_format, dry_run, verbose, version):
input_format, dry_run, verbose, disable_async, debug, version):
"""
Send a notification to all of the specified servers identified by their
URLs the content provided within the title, body and notification-type.
@ -147,6 +150,11 @@ def main(body, title, config, attach, urls, notification_type, theme, tag,
# want to return a specific error code, you must call sys.exit()
# as you will see below.
debug = True if debug else False
if debug:
# Verbosity must be a minimum of 3
verbose = 3 if verbose < 3 else verbose
# Logging
ch = logging.StreamHandler(sys.stdout)
if verbose > 3:
@ -175,6 +183,12 @@ def main(body, title, config, attach, urls, notification_type, theme, tag,
ch.setFormatter(formatter)
logger.addHandler(ch)
# Update our asyncio logger
asyncio_logger = logging.getLogger('asyncio')
for handler in logger.handlers:
asyncio_logger.addHandler(handler)
asyncio_logger.setLevel(logger.level)
if version:
print_version_msg()
sys.exit(0)
@ -195,10 +209,18 @@ def main(body, title, config, attach, urls, notification_type, theme, tag,
sys.exit(1)
# Prepare our asset
asset = AppriseAsset(body_format=input_format, theme=theme)
asset = AppriseAsset(
body_format=input_format,
theme=theme,
# Async mode is only used for Python v3+ and allows a user to send
# all of their notifications asyncronously. This was made an option
# incase there are problems in the future where it's better that
# everything run sequentially/syncronously instead.
async_mode=disable_async is not True,
)
# Create our object
a = Apprise(asset=asset)
# Create our Apprise object
a = Apprise(asset=asset, debug=debug)
# Load our configuration if no URLs or specified configuration was
# identified on the command line

View File

@ -24,6 +24,7 @@
# THE SOFTWARE.
import re
import six
from ..URLBase import URLBase
from ..common import NotifyType
@ -36,7 +37,17 @@ from ..AppriseLocale import gettext_lazy as _
from ..AppriseAttachment import AppriseAttachment
class NotifyBase(URLBase):
if six.PY3:
# Wrap our base with the asyncio wrapper
from ..py3compat.asyncio import AsyncNotifyBase
BASE_OBJECT = AsyncNotifyBase
else:
# Python v2.7 (backwards compatibility)
BASE_OBJECT = URLBase
class NotifyBase(BASE_OBJECT):
"""
This is the base class for all notification services
"""

View File

View File

@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2020 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import sys
import asyncio
from ..URLBase import URLBase
from ..logger import logger
# A global flag that tracks if we are Python v3.7 or higher
ASYNCIO_RUN_SUPPORT = \
sys.version_info.major > 3 or \
(sys.version_info.major == 3 and sys.version_info.minor >= 7)
def notify(coroutines, debug=False):
"""
A Wrapper to the AsyncNotifyBase.async_notify() calls allowing us
to call gather() and collect the responses
"""
# Create log entry
logger.info(
'Notifying {} service(s) asynchronous.'.format(len(coroutines)))
if ASYNCIO_RUN_SUPPORT:
# async reference produces a SyntaxError (E999) in Python v2.7
# For this reason we turn on the noqa flag
async def main(results, coroutines): # noqa: E999
"""
Task: Notify all servers specified and return our result set
through a mutable object.
"""
# send our notifications and store our result set into
# our results dictionary
results['response'] = \
await asyncio.gather(*coroutines, return_exceptions=True)
# Initialize a mutable object we can populate with our notification
# responses
results = {}
# Send our notifications
asyncio.run(main(results, coroutines), debug=debug)
# Acquire our return status
status = next((s for s in results['response'] if s is False), True)
else:
#
# The depricated way
#
# acquire access to our event loop
loop = asyncio.get_event_loop()
if debug:
# Enable debug mode
loop.set_debug(1)
# Send our notifications and acquire our status
results = loop.run_until_complete(asyncio.gather(*coroutines))
# Acquire our return status
status = next((r for r in results if r is False), True)
# Returns True if all notifications succeeded, otherwise False is
# returned.
return status
class AsyncNotifyBase(URLBase):
"""
asyncio wrapper for the NotifyBase object
"""
async def async_notify(self, *args, **kwargs): # noqa: E999
"""
Async Notification Wrapper
"""
try:
return self.notify(*args, **kwargs)
except TypeError:
# These our our internally thrown notifications
pass
except Exception:
# A catch all so we don't have to abort early
# just because one of our plugins has a bug in it.
logger.exception("Notification Exception")
return False

View File

@ -1,7 +1,7 @@
.\" generated with Ronn/v0.7.3
.\" http://github.com/rtomayko/ronn/tree/0.7.3
.
.TH "APPRISE" "1" "July 2020" "" ""
.TH "APPRISE" "1" "August 2020" "" ""
.
.SH "NAME"
\fBapprise\fR \- Push Notifications that work with just about every platform!
@ -69,6 +69,14 @@ Perform a trial run but only prints the notification services to\-be triggered t
The more of these you specify, the more verbose the output is\.
.
.TP
\fB\-Da\fR, \fB\-\-disable\-async\fR
Send notifications synchronously (one after the other) instead of all at once\.
.
.TP
\fB\-D\fR, \fB\-\-debug\fR
A debug mode; useful for troubleshooting\.
.
.TP
\fB\-V\fR, \fB\-\-version\fR
Display the apprise version and exit\.
.

View File

@ -56,6 +56,13 @@ The Apprise options are as follows:
* `-v`, `--verbose`:
The more of these you specify, the more verbose the output is.
* `-Da`, `--disable-async`:
Send notifications synchronously (one after the other) instead of
all at once.
* `-D`, `--debug`:
A debug mode; useful for troubleshooting.
* `-V`, `--version`:
Display the apprise version and exit.

View File

@ -48,9 +48,9 @@ import logging
logging.disable(logging.CRITICAL)
def test_apprise_cli(tmpdir):
def test_apprise_cli_nux_env(tmpdir):
"""
API: Apprise() CLI
CLI: Nux Environment
"""
@ -110,12 +110,47 @@ def test_apprise_cli(tmpdir):
])
assert result.exit_code == 0
# Run in synchronous mode
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'good://localhost',
'--disable-async',
])
assert result.exit_code == 0
# Test Debug Mode (--debug)
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'good://localhost',
'--debug',
])
assert result.exit_code == 0
# Test Debug Mode (-D)
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'good://localhost',
'-D',
])
assert result.exit_code == 0
result = runner.invoke(cli.main, [
'-t', 'test title',
'good://localhost',
], input='test stdin body\n')
assert result.exit_code == 0
# Run in synchronous mode
result = runner.invoke(cli.main, [
'-t', 'test title',
'good://localhost',
'--disable-async',
], input='test stdin body\n')
assert result.exit_code == 0
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
@ -123,6 +158,15 @@ def test_apprise_cli(tmpdir):
])
assert result.exit_code == 1
# Run in synchronous mode
result = runner.invoke(cli.main, [
'-t', 'test title',
'-b', 'test body',
'bad://localhost',
'-Da',
])
assert result.exit_code == 1
# Testing with the --dry-run flag reveals a successful response since we
# don't actually execute the bad:// notification; we only display it
result = runner.invoke(cli.main, [
@ -302,7 +346,7 @@ def test_apprise_cli(tmpdir):
@mock.patch('platform.system')
def test_apprise_cli_windows_env(mock_system):
"""
API: Apprise() CLI Windows Environment
CLI: Windows Environment
"""
# Force a windows environment