Fix XMPP plugin (#217)

This commit is contained in:
Andreas Motl 2020-03-27 16:54:01 +01:00 committed by GitHub
parent 3895ad9b58
commit 7c251bf35c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 189 additions and 152 deletions

1
.gitignore vendored
View File

@ -12,6 +12,7 @@ __pycache__/
# Distribution / packaging
.Python
env/
.venv*
build/
develop-eggs/
dist/

View File

@ -7,7 +7,7 @@
## Contributors
The following users have contributed to this project and their deserved
recogition has been identified here. If you have contributed and wish
recognition has been identified here. If you have contributed and wish
to be acknowledged for it, the syntax is as follows:
```
@ -21,3 +21,6 @@ The contributors have been listed in chronological order:
* Hitesh Sondhi <hitesh@cropsly.com>
* Mar 2019 - Added Flock Support
* Andreas Motl <andreas@getkotori.org>
* Mar 2020 - Fix XMPP Support

View File

@ -25,6 +25,7 @@
import re
import ssl
import logging
from os.path import isfile
from .NotifyBase import NotifyBase
@ -82,6 +83,9 @@ class NotifyXMPP(NotifyBase):
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_xmpp'
# Lower throttle rate for XMPP
request_rate_per_sec = 0.5
# The default XMPP port
default_unsecure_port = 5222
@ -267,34 +271,7 @@ class NotifyXMPP(NotifyBase):
jid = self.host
password = self.password if self.password else self.user
# Prepare our object
xmpp = sleekxmpp.ClientXMPP(jid, password)
for xep in self.xep:
# Load xep entries
xmpp.register_plugin('xep_{0:04d}'.format(xep))
if self.secure:
xmpp.ssl_version = ssl.PROTOCOL_TLSv1
# If the python version supports it, use highest TLS version
# automatically
if hasattr(ssl, "PROTOCOL_TLS"):
# Use the best version of TLS available to us
xmpp.ssl_version = ssl.PROTOCOL_TLS
xmpp.ca_certs = None
if self.verify_certificate:
# Set the ca_certs variable for certificate verification
xmpp.ca_certs = next(
(cert for cert in CA_CERTIFICATE_FILE_LOCATIONS
if isfile(cert)), None)
if xmpp.ca_certs is None:
self.logger.warning(
'XMPP Secure comunication can not be verified; '
'no CA certificate found')
# Acquire our port number
# Compute port number
if not self.port:
port = self.default_secure_port \
if self.secure else self.default_unsecure_port
@ -302,48 +279,23 @@ class NotifyXMPP(NotifyBase):
else:
port = self.port
# Establish our connection
if not xmpp.connect((self.host, port)):
return False
xmpp.send_presence()
try:
xmpp.get_roster()
except sleekxmpp.exceptions.IqError as e:
self.logger.warning('There was an error getting the XMPP roster.')
self.logger.debug(e.iq['error']['condition'])
xmpp.disconnect()
return False
except sleekxmpp.exceptions.IqTimeout:
self.logger.warning('XMPP Server is taking too long to respond.')
xmpp.disconnect()
return False
targets = list(self.targets)
if not targets:
# We always default to notifying ourselves
targets.append(jid)
while len(targets) > 0:
# Get next target (via JID)
target = targets.pop(0)
# Always call throttle before any remote server i/o is made
# Handler function to be called before each message.
# Always call throttle before any remote server i/o is made.
def on_before_message():
self.throttle()
# The message we wish to send, and the JID that
# will receive it.
xmpp.send_message(mto=target, mbody=body, mtype='chat')
# Communicate with XMPP.
xmpp_adapter = SleekXmppAdapter(
host=self.host, port=port, secure=self.secure,
verify_certificate=self.verify_certificate,
xep=self.xep, jid=jid, password=password,
body=body, targets=self.targets, before_message=on_before_message,
logger=self.logger)
# Using wait=True ensures that the send queue will be
# emptied before ending the session.
xmpp.disconnect(wait=True)
# Initialize XMPP machinery and begin processing the XML stream.
outcome = xmpp_adapter.process()
return True
return outcome
def url(self, privacy=False, *args, **kwargs):
"""
@ -427,3 +379,132 @@ class NotifyXMPP(NotifyBase):
NotifyXMPP.parse_list(results['qsd']['to'])
return results
class SleekXmppAdapter(object):
"""
Wrapper to SleekXmpp
"""
def __init__(self,
host=None, port=None, secure=None, verify_certificate=None,
xep=None, jid=None, password=None, body=None, targets=None,
before_message=None, logger=None):
self.host = host
self.port = port
self.secure = secure
self.verify_certificate = verify_certificate
self.xep = xep
self.jid = jid
self.password = password
self.body = body
self.targets = targets
self.before_message = before_message
self.logger = logger or logging.getLogger(__name__)
# Reference to XMPP client.
self.xmpp = None
# Whether everything succeeded.
self.success = False
self.configure_logging()
self.setup()
def configure_logging(self):
# Use the Apprise log handlers for configuring
# the sleekxmpp logger.
apprise_logger = logging.getLogger('apprise')
sleek_logger = logging.getLogger('sleekxmpp')
for handler in apprise_logger.handlers:
sleek_logger.addHandler(handler)
sleek_logger.setLevel(apprise_logger.level)
def setup(self):
# Prepare our object
self.xmpp = sleekxmpp.ClientXMPP(self.jid, self.password)
self.xmpp.add_event_handler("session_start", self.session_start)
self.xmpp.add_event_handler("failed_auth", self.failed_auth)
for xep in self.xep:
# Load xep entries
self.xmpp.register_plugin('xep_{0:04d}'.format(xep))
if self.secure:
# Don't even try to use the outdated ssl.PROTOCOL_SSLx
self.xmpp.ssl_version = ssl.PROTOCOL_TLSv1
# If the python version supports it, use highest TLS version
# automatically
if hasattr(ssl, "PROTOCOL_TLS"):
# Use the best version of TLS available to us
self.xmpp.ssl_version = ssl.PROTOCOL_TLS
self.xmpp.ca_certs = None
if self.verify_certificate:
# Set the ca_certs variable for certificate verification
self.xmpp.ca_certs = next(
(cert for cert in CA_CERTIFICATE_FILE_LOCATIONS
if isfile(cert)), None)
if self.xmpp.ca_certs is None:
self.logger.warning(
'XMPP Secure comunication can not be verified; '
'no CA certificate found')
def process(self):
# Establish connection to XMPP server.
# To speed up sending messages, don't use the "reattempt" feature,
# it will add a nasty delay even before connecting to XMPP server.
if not self.xmpp.connect((self.host, self.port),
use_ssl=self.secure, reattempt=False):
return False
# Process XMPP communication.
self.xmpp.process(block=True)
return self.success
def session_start(self, event):
"""
Session Manager
"""
targets = list(self.targets)
if not targets:
# We always default to notifying ourselves
targets.append(self.jid)
while len(targets) > 0:
# Get next target (via JID)
target = targets.pop(0)
# Invoke "before_message" event hook.
# Here, it will indirectly invoke the throttling feature,
# which adds a delay before any remote server i/o is made.
if callable(self.before_message):
self.before_message()
# The message we wish to send, and the JID that will receive it.
self.xmpp.send_message(mto=target, mbody=self.body, mtype='chat')
# Using wait=True ensures that the send queue will be
# emptied before ending the session.
self.xmpp.disconnect(wait=True)
self.success = True
def failed_auth(self, event):
self.logger.error('Authentication with XMPP server failed')

View File

@ -6,3 +6,6 @@ pytest-cov
tox
babel
cryptography
# Plugin Dependencies
sleekxmpp

View File

@ -24,9 +24,9 @@
# THE SOFTWARE.
import six
import mock
import sys
import ssl
import mock
import apprise
@ -49,47 +49,10 @@ logging.disable(logging.CRITICAL)
def test_xmpp_plugin(tmpdir):
"""
API: NotifyXMPP Plugin()
"""
# Our module base
sleekxmpp_name = 'sleekxmpp'
# First we do an import without the sleekxmpp library available to ensure
# we can handle cases when the library simply isn't available
if sleekxmpp_name in sys.modules:
# Test cases where the sleekxmpp library exists; we want to remove it
# for the purpose of testing and capture the handling of the
# library when it is missing
del sys.modules[sleekxmpp_name]
reload(sys.modules['apprise.plugins.NotifyXMPP'])
# We need to fake our gnome environment for testing purposes since
# the sleekxmpp library isn't available in Travis CI
sys.modules[sleekxmpp_name] = mock.MagicMock()
xmpp = mock.Mock()
xmpp.register_plugin.return_value = True
xmpp.send_message.return_value = True
xmpp.connect.return_value = True
xmpp.disconnect.return_value = True
xmpp.send_presence.return_value = True
xmpp.get_roster.return_value = True
xmpp.ssl_version = None
class IqError(Exception):
iq = {'error': {'condition': 'test'}}
pass
class IqTimeout(Exception):
pass
# Setup our Exceptions
sys.modules[sleekxmpp_name].exceptions.IqError = IqError
sys.modules[sleekxmpp_name].exceptions.IqTimeout = IqTimeout
sys.modules[sleekxmpp_name].ClientXMPP.return_value = xmpp
# Mock the sleekxmpp module completely.
sys.modules['sleekxmpp'] = mock.MagicMock()
# The following libraries need to be reloaded to prevent
# TypeError: super(type, obj): obj must be an instance or subtype of type
@ -103,9 +66,17 @@ def test_xmpp_plugin(tmpdir):
reload(sys.modules['apprise.Apprise'])
reload(sys.modules['apprise'])
# An empty CA list
sys.modules['apprise.plugins.NotifyXMPP']\
.CA_CERTIFICATE_FILE_LOCATIONS = []
# Mock the XMPP adapter to override "self.success".
# This will signal a successful message delivery.
from apprise.plugins.NotifyXMPP import SleekXmppAdapter
class MockedSleekXmppAdapter(SleekXmppAdapter):
def __init__(self, *args, **kwargs):
super(MockedSleekXmppAdapter, self).__init__(*args, **kwargs)
self.success = True
NotifyXMPP = sys.modules['apprise.plugins.NotifyXMPP']
NotifyXMPP.SleekXmppAdapter = MockedSleekXmppAdapter
# Disable Throttling to speed testing
apprise.plugins.NotifyBase.request_rate_per_sec = 0
@ -136,7 +107,7 @@ def test_xmpp_plugin(tmpdir):
del ssl.PROTOCOL_TLS
# Test our URL
url = 'xmpps://user:pass@example.com'
url = 'xmpps://user:pass@localhost'
obj = apprise.Apprise.instantiate(url, suppress_exceptions=False)
# Test we loaded
assert isinstance(obj, apprise.plugins.NotifyXMPP) is True
@ -151,7 +122,7 @@ def test_xmpp_plugin(tmpdir):
# Handle case where it is not missing
setattr(ssl, 'PROTOCOL_TLS', ssl.PROTOCOL_TLSv1)
# Test our URL
url = 'xmpps://user:pass@example.com'
url = 'xmpps://user:pass@localhost'
obj = apprise.Apprise.instantiate(url, suppress_exceptions=False)
# Test we loaded
assert isinstance(obj, apprise.plugins.NotifyXMPP) is True
@ -164,31 +135,31 @@ def test_xmpp_plugin(tmpdir):
urls = (
{
'u': 'xmpps://user:pass@example.com',
'p': 'xmpps://user:****@example.com',
'u': 'xmpps://user:pass@localhost',
'p': 'xmpps://user:****@localhost',
}, {
'u': 'xmpps://user:pass@example.com?'
'u': 'xmpps://user:pass@localhost?'
'xep=30,199,garbage,xep_99999999',
'p': 'xmpps://user:****@example.com',
'p': 'xmpps://user:****@localhost',
}, {
'u': 'xmpps://user:pass@example.com?xep=ignored',
'p': 'xmpps://user:****@example.com',
'u': 'xmpps://user:pass@localhost?xep=ignored',
'p': 'xmpps://user:****@localhost',
}, {
'u': 'xmpps://pass@example.com/'
'u': 'xmpps://pass@localhost/'
'user@test.com, user2@test.com/resource',
'p': 'xmpps://****@example.com',
'p': 'xmpps://****@localhost',
}, {
'u': 'xmpps://pass@example.com:5226?jid=user@test.com',
'p': 'xmpps://****@example.com:5226',
'u': 'xmpps://pass@localhost:5226?jid=user@test.com',
'p': 'xmpps://****@localhost:5226',
}, {
'u': 'xmpps://pass@example.com?jid=user@test.com&verify=False',
'p': 'xmpps://****@example.com',
'u': 'xmpps://pass@localhost?jid=user@test.com&verify=False',
'p': 'xmpps://****@localhost',
}, {
'u': 'xmpps://user:pass@example.com?verify=False',
'p': 'xmpps://user:****@example.com',
'u': 'xmpps://user:pass@localhost?verify=False',
'p': 'xmpps://user:****@localhost',
}, {
'u': 'xmpp://user:pass@example.com?to=user@test.com',
'p': 'xmpp://user:****@example.com',
'u': 'xmpp://user:pass@localhost?to=user@test.com',
'p': 'xmpp://user:****@localhost',
}
)
@ -241,31 +212,9 @@ def test_xmpp_plugin(tmpdir):
.CA_CERTIFICATE_FILE_LOCATIONS = [str(ca_cert), ]
obj = apprise.Apprise.instantiate(
'xmpps://pass@example.com/user@test.com',
'xmpps://pass@localhost/user@test.com',
suppress_exceptions=False)
# Our notification now should be able to get a ca_cert to reference
assert obj.notify(
title='', body='body', notify_type=apprise.NotifyType.INFO) is True
# Test Connect Failures
xmpp.connect.return_value = False
assert obj.notify(
title='', body='body', notify_type=apprise.NotifyType.INFO) is False
# Return our object value so we don't obstruct other tests
xmpp.connect.return_value = True
# Test Exceptions
xmpp.get_roster.side_effect = \
sys.modules[sleekxmpp_name].exceptions.IqTimeout()
assert obj.notify(
title='', body='body', notify_type=apprise.NotifyType.INFO) is False
xmpp.get_roster.side_effect = None
xmpp.get_roster.side_effect = \
sys.modules[sleekxmpp_name].exceptions.IqError()
assert obj.notify(
title='', body='body', notify_type=apprise.NotifyType.INFO) is False
xmpp.get_roster.side_effect = None