mirror of
https://github.com/caronc/apprise.git
synced 2024-11-08 09:14:53 +01:00
Added DAPNET/Hampager Support (#506)
This commit is contained in:
parent
c06a2f3f87
commit
9242445126
@ -99,6 +99,7 @@ The table below identifies the services this tool supports and some example serv
|
||||
| -------------------- | ---------- | ------------ | -------------- |
|
||||
| [AWS SNS](https://github.com/caronc/apprise/wiki/Notify_sns) | sns:// | (TCP) 443 | sns://AccessKeyID/AccessSecretKey/RegionName/+PhoneNo<br/>sns://AccessKeyID/AccessSecretKey/RegionName/+PhoneNo1/+PhoneNo2/+PhoneNoN<br/>sns://AccessKeyID/AccessSecretKey/RegionName/Topic<br/>sns://AccessKeyID/AccessSecretKey/RegionName/Topic1/Topic2/TopicN
|
||||
| [ClickSend](https://github.com/caronc/apprise/wiki/Notify_clicksend) | clicksend:// | (TCP) 443 | clicksend://user:pass@PhoneNo<br/>clicksend://user:pass@ToPhoneNo1/ToPhoneNo2/ToPhoneNoN
|
||||
| [DAPNET](https://github.com/caronc/apprise/wiki/Notify_dapnet) | dapnet:// | (TCP) 80 | dapnet://user:pass@callsign<br/>dapnet://user:pass@callsign1/callsign2/callsignN
|
||||
| [D7 Networks](https://github.com/caronc/apprise/wiki/Notify_d7networks) | d7sms:// | (TCP) 443 | d7sms://user:pass@PhoneNo<br/>d7sms://user:pass@ToPhoneNo1/ToPhoneNo2/ToPhoneNoN
|
||||
| [DingTalk](https://github.com/caronc/apprise/wiki/Notify_dingtalk) | dingtalk:// | (TCP) 443 | dingtalk://token/<br />dingtalk://token/ToPhoneNo<br />dingtalk://token/ToPhoneNo1/ToPhoneNo2/ToPhoneNo1/
|
||||
| [Kavenegar](https://github.com/caronc/apprise/wiki/Notify_kavenegar) | kavenegar:// | (TCP) 443 | kavenegar://ApiKey/ToPhoneNo<br/>kavenegar://FromPhoneNo@ApiKey/ToPhoneNo<br/>kavenegar://ApiKey/ToPhoneNo1/ToPhoneNo2/ToPhoneNoN
|
||||
|
393
apprise/plugins/NotifyDapnet.py
Normal file
393
apprise/plugins/NotifyDapnet.py
Normal file
@ -0,0 +1,393 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
|
||||
# All rights reserved.
|
||||
#
|
||||
# This code is licensed under the MIT License.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files(the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions :
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in
|
||||
# all copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
# THE SOFTWARE.
|
||||
|
||||
# To use this plugin, sign up with Hampager (you need to be a licensed
|
||||
# ham radio operator
|
||||
# http://www.hampager.de/
|
||||
#
|
||||
# You're done at this point, you only need to know your user/pass that
|
||||
# you signed up with.
|
||||
|
||||
# The following URLs would be accepted by Apprise:
|
||||
# - dapnet://{user}:{password}@{callsign}
|
||||
# - dapnet://{user}:{password}@{callsign1}/{callsign2}
|
||||
|
||||
# Optional parameters:
|
||||
# - priority (NORMAL or EMERGENCY). Default: NORMAL
|
||||
# - txgroups --> comma-separated list of DAPNET transmitter
|
||||
# groups. Default: 'dl-all'
|
||||
# https://hampager.de/#/transmitters/groups
|
||||
|
||||
from json import dumps
|
||||
|
||||
# The API reference used to build this plugin was documented here:
|
||||
# https://hampager.de/dokuwiki/doku.php#dapnet_api
|
||||
#
|
||||
import requests
|
||||
from requests.auth import HTTPBasicAuth
|
||||
|
||||
from .NotifyBase import NotifyBase
|
||||
from ..AppriseLocale import gettext_lazy as _
|
||||
from ..URLBase import PrivacyMode
|
||||
from ..common import NotifyType
|
||||
from ..utils import is_call_sign
|
||||
from ..utils import parse_call_sign
|
||||
from ..utils import parse_list
|
||||
from ..utils import parse_bool
|
||||
|
||||
|
||||
class DapnetPriority(object):
|
||||
NORMAL = 0
|
||||
EMERGENCY = 1
|
||||
|
||||
|
||||
DAPNET_PRIORITIES = (
|
||||
DapnetPriority.NORMAL,
|
||||
DapnetPriority.EMERGENCY,
|
||||
)
|
||||
|
||||
|
||||
class NotifyDapnet(NotifyBase):
|
||||
"""
|
||||
A wrapper for DAPNET / Hampager Notifications
|
||||
"""
|
||||
|
||||
# The default descriptive name associated with the Notification
|
||||
service_name = 'Dapnet'
|
||||
|
||||
# The services URL
|
||||
service_url = 'https://hampager.de/'
|
||||
|
||||
# The default secure protocol
|
||||
secure_protocol = 'dapnet'
|
||||
|
||||
# A URL that takes you to the setup/help of the specific protocol
|
||||
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_dapnet'
|
||||
|
||||
# Dapnet uses the http protocol with JSON requests
|
||||
notify_url = 'http://www.hampager.de:8080/calls'
|
||||
|
||||
# The maximum length of the body
|
||||
body_maxlen = 80
|
||||
|
||||
# A title can not be used for Dapnet Messages. Setting this to zero will
|
||||
# cause any title (if defined) to get placed into the message body.
|
||||
title_maxlen = 0
|
||||
|
||||
# The maximum amount of emails that can reside within a single transmission
|
||||
default_batch_size = 50
|
||||
|
||||
# Define object templates
|
||||
templates = ('{schema}://{user}:{password}@{targets}',)
|
||||
|
||||
# Define our template tokens
|
||||
template_tokens = dict(
|
||||
NotifyBase.template_tokens,
|
||||
**{
|
||||
'user': {
|
||||
'name': _('User Name'),
|
||||
'type': 'string',
|
||||
'required': True,
|
||||
},
|
||||
'password': {
|
||||
'name': _('Password'),
|
||||
'type': 'string',
|
||||
'private': True,
|
||||
'required': True,
|
||||
},
|
||||
'target_callsign': {
|
||||
'name': _('Target Callsign'),
|
||||
'type': 'string',
|
||||
'regex': (
|
||||
r'^[a-z0-9]{2,5}(-[a-z0-9]{1,2})?$', 'i',
|
||||
),
|
||||
'map_to': 'targets',
|
||||
},
|
||||
'targets': {
|
||||
'name': _('Targets'),
|
||||
'type': 'list:string',
|
||||
'required': True,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
# Define our template arguments
|
||||
template_args = dict(
|
||||
NotifyBase.template_args,
|
||||
**{
|
||||
'to': {
|
||||
'name': _('Target Callsign'),
|
||||
'type': 'string',
|
||||
'map_to': 'targets',
|
||||
},
|
||||
'priority': {
|
||||
'name': _('Priority'),
|
||||
'type': 'choice:int',
|
||||
'values': DAPNET_PRIORITIES,
|
||||
'default': DapnetPriority.NORMAL,
|
||||
},
|
||||
'txgroups': {
|
||||
'name': _('Transmitter Groups'),
|
||||
'type': 'string',
|
||||
'default': 'dl-all',
|
||||
'private': True,
|
||||
},
|
||||
'batch': {
|
||||
'name': _('Batch Mode'),
|
||||
'type': 'bool',
|
||||
'default': False,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
def __init__(self, targets=None, priority=None, txgroups=None,
|
||||
batch=False, **kwargs):
|
||||
"""
|
||||
Initialize Dapnet Object
|
||||
"""
|
||||
super(NotifyDapnet, self).__init__(**kwargs)
|
||||
|
||||
# Parse our targets
|
||||
self.targets = list()
|
||||
|
||||
# get the emergency prio setting
|
||||
if priority not in DAPNET_PRIORITIES:
|
||||
self.priority = self.template_args['priority']['default']
|
||||
else:
|
||||
self.priority = priority
|
||||
|
||||
if not (self.user and self.password):
|
||||
msg = 'A Dapnet user/pass was not provided.'
|
||||
self.logger.warning(msg)
|
||||
raise TypeError(msg)
|
||||
|
||||
# Get the transmitter group
|
||||
self.txgroups = parse_list(
|
||||
NotifyDapnet.template_args['txgroups']['default']
|
||||
if not txgroups else txgroups)
|
||||
|
||||
# Prepare Batch Mode Flag
|
||||
self.batch = batch
|
||||
|
||||
for target in parse_call_sign(targets):
|
||||
# Validate targets and drop bad ones:
|
||||
result = is_call_sign(target)
|
||||
if not result:
|
||||
self.logger.warning(
|
||||
'Dropping invalid Amateur radio call sign ({}).'.format(
|
||||
target),
|
||||
)
|
||||
continue
|
||||
|
||||
# Store callsign
|
||||
self.targets.append(result['callsign'])
|
||||
|
||||
return
|
||||
|
||||
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
|
||||
"""
|
||||
Perform Dapnet Notification
|
||||
"""
|
||||
|
||||
if not self.targets:
|
||||
# There is no one to email; we're done
|
||||
self.logger.warning(
|
||||
'There are no Amateur radio callsigns to notify')
|
||||
return False
|
||||
|
||||
# Send in batches if identified to do so
|
||||
batch_size = 1 if not self.batch else self.default_batch_size
|
||||
|
||||
headers = {
|
||||
'User-Agent': self.app_id,
|
||||
'Content-Type': 'application/json; charset=utf-8',
|
||||
}
|
||||
|
||||
# error tracking (used for function return)
|
||||
has_error = False
|
||||
|
||||
# prepare the emergency mode
|
||||
emergency_mode = True \
|
||||
if self.priority == DapnetPriority.EMERGENCY else False
|
||||
|
||||
# Create a copy of the targets list
|
||||
targets = list(self.targets)
|
||||
|
||||
for index in range(0, len(targets), batch_size):
|
||||
|
||||
# prepare JSON payload
|
||||
payload = {
|
||||
'text': body,
|
||||
'callSignNames': targets[index:index + batch_size],
|
||||
'transmitterGroupNames': self.txgroups,
|
||||
'emergency': emergency_mode,
|
||||
}
|
||||
|
||||
self.logger.debug('DAPNET POST URL: %s' % self.notify_url)
|
||||
self.logger.debug('DAPNET Payload: %s' % dumps(payload))
|
||||
|
||||
# Always call throttle before any remote server i/o is made
|
||||
self.throttle()
|
||||
try:
|
||||
r = requests.post(
|
||||
self.notify_url,
|
||||
data=dumps(payload),
|
||||
headers=headers,
|
||||
auth=HTTPBasicAuth(
|
||||
username=self.user, password=self.password),
|
||||
verify=self.verify_certificate,
|
||||
timeout=self.request_timeout,
|
||||
)
|
||||
if r.status_code != requests.codes.created:
|
||||
# We had a problem
|
||||
|
||||
self.logger.warning(
|
||||
'Failed to send DAPNET notification {} to {}: '
|
||||
'error={}.'.format(
|
||||
payload['text'],
|
||||
' to {}'.format(self.targets),
|
||||
r.status_code
|
||||
)
|
||||
)
|
||||
|
||||
self.logger.debug(
|
||||
'Response Details:\r\n{}'.format(r.content))
|
||||
|
||||
# Mark our failure
|
||||
has_error = True
|
||||
|
||||
else:
|
||||
self.logger.info(
|
||||
'Sent \'{}\' DAPNET notification {}'.format(
|
||||
payload['text'], 'to {}'.format(self.targets)
|
||||
)
|
||||
)
|
||||
|
||||
except requests.RequestException as e:
|
||||
self.logger.warning(
|
||||
'A Connection error occurred sending DAPNET '
|
||||
'notification to {}'.format(self.targets)
|
||||
)
|
||||
self.logger.debug('Socket Exception: %s' % str(e))
|
||||
|
||||
# Mark our failure
|
||||
has_error = True
|
||||
|
||||
return not has_error
|
||||
|
||||
def url(self, privacy=False, *args, **kwargs):
|
||||
"""
|
||||
Returns the URL built dynamically based on specified arguments.
|
||||
"""
|
||||
|
||||
# Define any URL parameters
|
||||
_map = {
|
||||
DapnetPriority.NORMAL: 'normal',
|
||||
DapnetPriority.EMERGENCY: 'emergency',
|
||||
}
|
||||
|
||||
# Define any URL parameters
|
||||
params = {
|
||||
'priority': 'normal' if self.priority not in _map
|
||||
else _map[self.priority],
|
||||
'batch': 'yes' if self.batch else 'no',
|
||||
'txgroups': ','.join(self.txgroups),
|
||||
}
|
||||
|
||||
# Extend our parameters
|
||||
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
|
||||
|
||||
# Setup Authentication
|
||||
auth = '{user}:{password}@'.format(
|
||||
user=NotifyDapnet.quote(self.user, safe=""),
|
||||
password=self.pprint(
|
||||
self.password, privacy, mode=PrivacyMode.Secret, safe=''
|
||||
),
|
||||
)
|
||||
|
||||
return '{schema}://{auth}{targets}?{params}'.format(
|
||||
schema=self.secure_protocol,
|
||||
auth=auth,
|
||||
targets='/'.join([self.pprint(x, privacy, safe='')
|
||||
for x in self.targets]),
|
||||
params=NotifyDapnet.urlencode(params),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def parse_url(url):
|
||||
"""
|
||||
Parses the URL and returns enough arguments that can allow
|
||||
us to re-instantiate this object.
|
||||
|
||||
"""
|
||||
results = NotifyBase.parse_url(url, verify_host=False)
|
||||
if not results:
|
||||
# We're done early as we couldn't load the results
|
||||
return results
|
||||
|
||||
# All elements are targets
|
||||
results['targets'] = [NotifyDapnet.unquote(results['host'])]
|
||||
|
||||
# All entries after the hostname are additional targets
|
||||
results['targets'].extend(NotifyDapnet.split_path(results['fullpath']))
|
||||
|
||||
# Support the 'to' variable so that we can support rooms this way too
|
||||
# The 'to' makes it easier to use yaml configuration
|
||||
if 'to' in results['qsd'] and len(results['qsd']['to']):
|
||||
results['targets'] += parse_call_sign(results['qsd']['to'])
|
||||
|
||||
# Check for priority
|
||||
if 'priority' in results['qsd'] and len(results['qsd']['priority']):
|
||||
_map = {
|
||||
# Letter Assignments
|
||||
'n': DapnetPriority.NORMAL,
|
||||
'e': DapnetPriority.EMERGENCY,
|
||||
'no': DapnetPriority.NORMAL,
|
||||
'em': DapnetPriority.EMERGENCY,
|
||||
# Numeric assignments
|
||||
'0': DapnetPriority.NORMAL,
|
||||
'1': DapnetPriority.EMERGENCY,
|
||||
}
|
||||
try:
|
||||
results['priority'] = \
|
||||
_map[results['qsd']['priority'][0:2].lower()]
|
||||
|
||||
except KeyError:
|
||||
# No priority was set
|
||||
pass
|
||||
|
||||
# Check for one or multiple transmitter groups (comma separated)
|
||||
# and split them up, when necessary
|
||||
if 'txgroups' in results['qsd']:
|
||||
results['txgroups'] = \
|
||||
[x.lower() for x in
|
||||
NotifyDapnet.parse_list(results['qsd']['txgroups'])]
|
||||
|
||||
# Get Batch Mode Flag
|
||||
results['batch'] = \
|
||||
parse_bool(results['qsd'].get(
|
||||
'batch', NotifyDapnet.template_args['batch']['default']))
|
||||
|
||||
return results
|
@ -133,6 +133,17 @@ IS_PHONE_NO = re.compile(r'^\+?(?P<phone>[0-9\s)(+-]+)\s*$')
|
||||
PHONE_NO_DETECTION_RE = re.compile(
|
||||
r'\s*([+(\s]*[0-9][0-9()\s-]+[0-9])(?=$|[\s,+(]+[0-9])', re.I)
|
||||
|
||||
# A simple verification check to make sure the content specified
|
||||
# rougly conforms to a ham radio call sign before we parse it further
|
||||
IS_CALL_SIGN = re.compile(
|
||||
r'^(?P<callsign>[a-z0-9]{2,3}[0-9][a-z0-9]{3})'
|
||||
r'(?P<ssid>-[a-z0-9]{1,2})?\s*$', re.I)
|
||||
|
||||
# Regular expression used to destinguish between multiple ham radio call signs
|
||||
CALL_SIGN_DETECTION_RE = re.compile(
|
||||
r'\s*([a-z0-9]{2,3}[0-9][a-z0-9]{3}(?:-[a-z0-9]{1,2})?)'
|
||||
r'(?=$|[\s,]+[a-z0-9]{4,6})', re.I)
|
||||
|
||||
# Regular expression used to destinguish between multiple URLs
|
||||
URL_DETECTION_RE = re.compile(
|
||||
r'([a-z0-9]+?:\/\/.*?)(?=$|[\s,]+[a-z0-9]{2,9}?:\/\/)', re.I)
|
||||
@ -372,6 +383,37 @@ def is_phone_no(phone, min_len=11):
|
||||
}
|
||||
|
||||
|
||||
def is_call_sign(callsign):
|
||||
"""Determine if the specified entry is a ham radio call sign
|
||||
|
||||
Args:
|
||||
callsign (str): The string you want to check.
|
||||
|
||||
Returns:
|
||||
bool: Returns False if the address specified is not a phone number
|
||||
"""
|
||||
|
||||
try:
|
||||
result = IS_CALL_SIGN.match(callsign)
|
||||
if not result:
|
||||
# not parseable content as it does not even conform closely to a
|
||||
# callsign
|
||||
return False
|
||||
|
||||
except TypeError:
|
||||
# not parseable content
|
||||
return False
|
||||
|
||||
ssid = result.group('ssid')
|
||||
return {
|
||||
# always treat call signs as uppercase content
|
||||
'callsign': result.group('callsign').upper(),
|
||||
# Prevent the storing of the None keyword in the event the SSID was
|
||||
# not detected
|
||||
'ssid': ssid if ssid else '',
|
||||
}
|
||||
|
||||
|
||||
def is_email(address):
|
||||
"""Determine if the specified entry is an email address
|
||||
|
||||
@ -766,6 +808,43 @@ def parse_phone_no(*args, **kwargs):
|
||||
return result
|
||||
|
||||
|
||||
def parse_call_sign(*args, **kwargs):
|
||||
"""
|
||||
Takes a string containing ham radio call signs separated by
|
||||
comma and/or spacesand returns a list.
|
||||
"""
|
||||
|
||||
# for Python 2.7 support, store_unparsable is not in the url above
|
||||
# as just parse_emails(*args, store_unparseable=True) since it is
|
||||
# an invalid syntax. This is the workaround to be backards compatible:
|
||||
store_unparseable = kwargs.get('store_unparseable', True)
|
||||
|
||||
result = []
|
||||
for arg in args:
|
||||
if isinstance(arg, six.string_types) and arg:
|
||||
_result = CALL_SIGN_DETECTION_RE.findall(arg)
|
||||
if _result:
|
||||
result += _result
|
||||
|
||||
elif not _result and store_unparseable:
|
||||
# we had content passed into us that was lost because it was
|
||||
# so poorly formatted that it didn't even come close to
|
||||
# meeting the regular expression we defined. We intentially
|
||||
# keep it as part of our result set so that parsing done
|
||||
# at a higher level can at least report this to the end user
|
||||
# and hopefully give them some indication as to what they
|
||||
# may have done wrong.
|
||||
result += \
|
||||
[x for x in filter(bool, re.split(STRING_DELIMITERS, arg))]
|
||||
|
||||
elif isinstance(arg, (set, list, tuple)):
|
||||
# Use recursion to handle the list of call signs
|
||||
result += parse_call_sign(
|
||||
*arg, store_unparseable=store_unparseable)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def parse_emails(*args, **kwargs):
|
||||
"""
|
||||
Takes a string containing emails separated by comma's and/or spaces and
|
||||
|
@ -47,7 +47,7 @@ Apprise is a Python package for simplifying access to all of the different
|
||||
notification services that are out there. Apprise opens the door and makes
|
||||
it easy to access:
|
||||
|
||||
Apprise API, AWS SES, AWS SNS, Boxcar, ClickSend, DingTalk, Discord, E-Mail,
|
||||
Apprise API, AWS SES, AWS SNS, Boxcar, ClickSend, DAPNET, DingTalk, Discord, E-Mail,
|
||||
Emby, Faast, FCM, Flock, Gitter, Google Chat, Gotify, Growl, Home Assistant,
|
||||
IFTTT, Join, Kavenegar, KODI, Kumulos, LaMetric, MacOSX, Mailgun, Mattermost,
|
||||
Matrix, Microsoft Windows, Microsoft Teams, MessageBird, MQTT, MSG91, MyAndroid,
|
||||
|
8
setup.py
8
setup.py
@ -70,10 +70,10 @@ setup(
|
||||
cmdclass=cmdclass,
|
||||
url='https://github.com/caronc/apprise',
|
||||
keywords='Push Notifications Alerts Email AWS SES SNS Boxcar ClickSend '
|
||||
'Dingtalk Discord Dbus Emby Faast FCM Flock Gitter Gnome Google Chat '
|
||||
'Gotify Growl Home Assistant IFTTT Join Kavenegar KODI Kumulos '
|
||||
'LaMetric MacOS Mailgun Matrix Mattermost MessageBird MQTT MSG91 '
|
||||
'Nexmo Nextcloud Notica Notifico Office365 OneSignal Opsgenie '
|
||||
'DAPNET Dingtalk Discord Dbus Emby Faast FCM Flock Gitter Gnome '
|
||||
'Google Chat Gotify Growl Home Assistant IFTTT Join Kavenegar KODI '
|
||||
'Kumulos LaMetric MacOS Mailgun Matrix Mattermost MessageBird MQTT '
|
||||
'MSG91 Nexmo Nextcloud Notica Notifico Office365 OneSignal Opsgenie '
|
||||
'ParsePlatform PopcornNotify Prowl PushBullet Pushjet Pushed '
|
||||
'Pushover PushSafer Reddit Rocket.Chat Ryver SendGrid ServerChan '
|
||||
'SimplePush Sinch Slack SMTP2Go SparkPost Spontit Streamlabs '
|
||||
|
@ -230,6 +230,9 @@ class AppriseURLTester(object):
|
||||
assert False
|
||||
|
||||
if isinstance(obj, plugins.NotifyBase):
|
||||
# Ensure we are not performing any type of thorttling
|
||||
obj.request_rate_per_sec = 0
|
||||
|
||||
# We loaded okay; now lets make sure we can reverse
|
||||
# this url
|
||||
assert isinstance(obj.url(), six.string_types) is True
|
||||
|
122
test/test_plugin_dapnet.py
Normal file
122
test/test_plugin_dapnet.py
Normal file
@ -0,0 +1,122 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (C) 2021 Chris Caron <lead2gold@gmail.com>
|
||||
# All rights reserved.
|
||||
#
|
||||
# This code is licensed under the MIT License.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files(the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions :
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in
|
||||
# all copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
# THE SOFTWARE.
|
||||
# Disable logging for a cleaner testing output
|
||||
import logging
|
||||
import requests
|
||||
|
||||
from apprise import plugins
|
||||
from helpers import AppriseURLTester
|
||||
|
||||
logging.disable(logging.CRITICAL)
|
||||
|
||||
# Our Testing URLs
|
||||
apprise_url_tests = (
|
||||
('dapnet://', {
|
||||
# We failed to identify any valid authentication
|
||||
'instance': TypeError,
|
||||
}),
|
||||
('dapnet://:@/', {
|
||||
# We failed to identify any valid authentication
|
||||
'instance': TypeError,
|
||||
}),
|
||||
('dapnet://user:pass', {
|
||||
# No call-sign specified
|
||||
'instance': TypeError,
|
||||
}),
|
||||
('dapnet://user@host', {
|
||||
# No password specified
|
||||
'instance': TypeError,
|
||||
}),
|
||||
('dapnet://user:pass@{}'.format('DF1ABC'), {
|
||||
# valid call sign
|
||||
'instance': plugins.NotifyDapnet,
|
||||
'requests_response_code': requests.codes.created,
|
||||
}),
|
||||
('dapnet://user:pass@{}/{}'.format('DF1ABC', 'DF1DEF'), {
|
||||
# valid call signs
|
||||
'instance': plugins.NotifyDapnet,
|
||||
'requests_response_code': requests.codes.created,
|
||||
}),
|
||||
('dapnet://user:pass@?to={},{}'.format('DF1ABC', 'DF1DEF'), {
|
||||
# support the to= argument
|
||||
'instance': plugins.NotifyDapnet,
|
||||
'requests_response_code': requests.codes.created,
|
||||
}),
|
||||
('dapnet://user:pass@{}?priority=normal'.format('DF1ABC'), {
|
||||
# valid call sign with priority
|
||||
'instance': plugins.NotifyDapnet,
|
||||
'requests_response_code': requests.codes.created,
|
||||
}),
|
||||
('dapnet://user:pass@{}?priority=em&batch=false'.format(
|
||||
'/'.join(['DF1ABC', '0A1DEF'])), {
|
||||
# valid call sign with priority (emergency) + no batch
|
||||
# transmissions
|
||||
'instance': plugins.NotifyDapnet,
|
||||
'requests_response_code': requests.codes.created,
|
||||
}),
|
||||
('dapnet://user:pass@{}?priority=invalid'.format('DF1ABC'), {
|
||||
# invalid priority
|
||||
'instance': plugins.NotifyDapnet,
|
||||
'requests_response_code': requests.codes.created,
|
||||
}),
|
||||
('dapnet://user:pass@{}?txgroups=dl-all,all'.format('DF1ABC'), {
|
||||
# valid call sign with two transmitter groups
|
||||
'instance': plugins.NotifyDapnet,
|
||||
'requests_response_code': requests.codes.created,
|
||||
}),
|
||||
('dapnet://user:pass@{}?txgroups=invalid'.format('DF1ABC'), {
|
||||
# valid call sign with invalid transmitter group
|
||||
'instance': plugins.NotifyDapnet,
|
||||
'requests_response_code': requests.codes.created,
|
||||
}),
|
||||
('dapnet://user:pass@{}/{}'.format('abcdefghi', 'a'), {
|
||||
# invalid call signs
|
||||
'instance': plugins.NotifyDapnet,
|
||||
'notify_response': False,
|
||||
}),
|
||||
# Edge cases
|
||||
('dapnet://user:pass@{}'.format('DF1ABC'), {
|
||||
'instance': plugins.NotifyDapnet,
|
||||
# throw a bizzare code forcing us to fail to look it up
|
||||
'response': False,
|
||||
'requests_response_code': 999,
|
||||
}),
|
||||
('dapnet://user:pass@{}'.format('DF1ABC'), {
|
||||
'instance': plugins.NotifyDapnet,
|
||||
# Throws a series of connection and transfer exceptions when this flag
|
||||
# is set and tests that we gracfully handle them
|
||||
'test_requests_exceptions': True,
|
||||
}),
|
||||
)
|
||||
|
||||
|
||||
def test_plugin_dapnet_urls():
|
||||
"""
|
||||
NotifyDapnet() Apprise URLs
|
||||
|
||||
"""
|
||||
|
||||
# Run our general tests
|
||||
AppriseURLTester(tests=apprise_url_tests).run_all()
|
@ -751,6 +751,44 @@ def test_is_email():
|
||||
assert utils.is_email("Name <bademail>") is False
|
||||
|
||||
|
||||
def test_is_call_sign_no():
|
||||
"""
|
||||
API: is_call_sign() function
|
||||
|
||||
"""
|
||||
# Invalid numbers
|
||||
assert utils.is_call_sign(None) is False
|
||||
assert utils.is_call_sign(42) is False
|
||||
assert utils.is_call_sign(object) is False
|
||||
assert utils.is_call_sign('') is False
|
||||
assert utils.is_call_sign('1') is False
|
||||
assert utils.is_call_sign('12') is False
|
||||
assert utils.is_call_sign('abc') is False
|
||||
assert utils.is_call_sign('+()') is False
|
||||
assert utils.is_call_sign('+') is False
|
||||
assert utils.is_call_sign(None) is False
|
||||
assert utils.is_call_sign(42) is False
|
||||
|
||||
# To short or 2 long
|
||||
assert utils.is_call_sign('DF1AB') is False
|
||||
assert utils.is_call_sign('DF1ABCX') is False
|
||||
assert utils.is_call_sign('DF1ABCEFG') is False
|
||||
assert utils.is_call_sign('1ABCX') is False
|
||||
# 4th character is not an number
|
||||
assert utils.is_call_sign('XXXXXX') is False
|
||||
|
||||
# Some valid checks
|
||||
result = utils.is_call_sign('DF1ABC')
|
||||
assert isinstance(result, dict)
|
||||
assert 'DF1ABC' == result['callsign']
|
||||
assert '' == result['ssid']
|
||||
|
||||
# Get our SSID
|
||||
result = utils.is_call_sign('DF1ABC-14')
|
||||
assert 'DF1ABC' == result['callsign']
|
||||
assert '-14' == result['ssid']
|
||||
|
||||
|
||||
def test_is_phone_no():
|
||||
"""
|
||||
API: is_phone_no() function
|
||||
@ -861,6 +899,48 @@ def test_is_phone_no():
|
||||
assert '18001234567' == results['full']
|
||||
|
||||
|
||||
def test_parse_call_sign():
|
||||
"""utils: parse_call_sign() testing """
|
||||
# A simple single array entry (As str)
|
||||
results = utils.parse_call_sign('')
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == 0
|
||||
|
||||
# just delimeters
|
||||
results = utils.parse_call_sign(', ,, , ,,, ')
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == 0
|
||||
|
||||
results = utils.parse_call_sign(None)
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == 0
|
||||
|
||||
results = utils.parse_call_sign(42)
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == 0
|
||||
|
||||
results = utils.parse_call_sign('this is not a parseable call sign at all')
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == 9
|
||||
|
||||
results = utils.parse_call_sign(
|
||||
'this is not a parseable call sign at all', store_unparseable=False)
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == 0
|
||||
|
||||
# Now test valid call signs
|
||||
results = utils.parse_call_sign('0A1DEF')
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == 1
|
||||
assert '0A1DEF' in results
|
||||
|
||||
results = utils.parse_call_sign('0A1DEF, DF1ABC')
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == 2
|
||||
assert '0A1DEF' in results
|
||||
assert 'DF1ABC' in results
|
||||
|
||||
|
||||
def test_parse_phone_no():
|
||||
"""utils: parse_phone_no() testing """
|
||||
# A simple single array entry (As str)
|
||||
|
Loading…
Reference in New Issue
Block a user