From 4c542070ebdf10694d0c153e5bae933ba87dde1b Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Sun, 19 Feb 2023 14:05:43 -0500 Subject: [PATCH] Ntfy Authentication Token support added (#830) --- apprise/plugins/NotifyNtfy.py | 121 ++++++++++++++++++++++++++++++---- test/test_plugin_ntfy.py | 51 ++++++++++++++ 2 files changed, 161 insertions(+), 11 deletions(-) diff --git a/apprise/plugins/NotifyNtfy.py b/apprise/plugins/NotifyNtfy.py index dff8f824..7efe3487 100644 --- a/apprise/plugins/NotifyNtfy.py +++ b/apprise/plugins/NotifyNtfy.py @@ -74,6 +74,27 @@ NTFY_MODES = ( NtfyMode.PRIVATE, ) +# A Simple regular expression used to auto detect Auth mode if it isn't +# otherwise specified: +NTFY_AUTH_DETECT_RE = re.compile('tk_[^ \t]+', re.IGNORECASE) + + +class NtfyAuth: + """ + Define ntfy Authentication Modes + """ + # Basic auth (user and password provided) + BASIC = "basic" + + # Auth Token based + TOKEN = "token" + + +NTFY_AUTH = ( + NtfyAuth.BASIC, + NtfyAuth.TOKEN, +) + class NtfyPriority: """ @@ -170,6 +191,8 @@ class NotifyNtfy(NotifyBase): '{schema}://{user}@{host}:{port}/{targets}', '{schema}://{user}:{password}@{host}/{targets}', '{schema}://{user}:{password}@{host}:{port}/{targets}', + '{schema}://{token}@{host}/{targets}', + '{schema}://{token}@{host}:{port}/{targets}', ) # Define our template tokens @@ -193,6 +216,11 @@ class NotifyNtfy(NotifyBase): 'type': 'string', 'private': True, }, + 'token': { + 'name': _('Token'), + 'type': 'string', + 'private': True, + }, 'topic': { 'name': _('Topic'), 'type': 'string', @@ -253,6 +281,15 @@ class NotifyNtfy(NotifyBase): 'values': NTFY_MODES, 'default': NtfyMode.PRIVATE, }, + 'token': { + 'alias_of': 'token', + }, + 'auth': { + 'name': _('Authentication Type'), + 'type': 'choice:string', + 'values': NTFY_AUTH, + 'default': NtfyAuth.BASIC, + }, 'to': { 'alias_of': 'targets', }, @@ -260,7 +297,8 @@ class NotifyNtfy(NotifyBase): def __init__(self, targets=None, attach=None, filename=None, click=None, delay=None, email=None, priority=None, tags=None, mode=None, - include_image=True, avatar_url=None, **kwargs): + include_image=True, avatar_url=None, auth=None, token=None, + **kwargs): """ Initialize ntfy Object """ @@ -279,6 +317,17 @@ class NotifyNtfy(NotifyBase): # Show image associated with notification self.include_image = include_image + # Prepare our authentication type + self.auth = auth.strip().lower() \ + if isinstance(auth, str) \ + else self.template_args['auth']['default'] + + if self.auth not in NTFY_AUTH: + msg = 'An invalid ntfy Authentication type ({}) was specified.' \ + .format(auth) + self.logger.warning(msg) + raise TypeError(msg) + # Attach a file (URL supported) self.attach = attach @@ -294,6 +343,9 @@ class NotifyNtfy(NotifyBase): # An email to forward notifications to self.email = email + # Save our token + self.token = token + # The Priority of the message self.priority = NotifyNtfy.template_args['priority']['default'] \ if not priority else \ @@ -418,9 +470,17 @@ class NotifyNtfy(NotifyBase): else: # NotifyNtfy.PRVATE # Allow more settings to be applied now - if self.user: + if self.auth == NtfyAuth.BASIC and self.user: auth = (self.user, self.password) + elif self.auth == NtfyAuth.TOKEN: + if not self.token: + self.logger.warning('No Ntfy Token was specified') + return False, None + + # Set Token + headers['Authorization'] = f'Bearer {self.token}' + # Prepare our ntfy Template URL schema = 'https' if self.secure else 'http' @@ -575,6 +635,7 @@ class NotifyNtfy(NotifyBase): 'priority': self.priority, 'mode': self.mode, 'image': 'yes' if self.include_image else 'no', + 'auth': self.auth, } if self.avatar_url: @@ -599,15 +660,22 @@ class NotifyNtfy(NotifyBase): # Determine Authentication auth = '' - if self.user and self.password: - auth = '{user}:{password}@'.format( - user=NotifyNtfy.quote(self.user, safe=''), - password=self.pprint( - self.password, privacy, mode=PrivacyMode.Secret, safe=''), - ) - elif self.user: - auth = '{user}@'.format( - user=NotifyNtfy.quote(self.user, safe=''), + if self.auth == NtfyAuth.BASIC: + if self.user and self.password: + auth = '{user}:{password}@'.format( + user=NotifyNtfy.quote(self.user, safe=''), + password=self.pprint( + self.password, privacy, mode=PrivacyMode.Secret, + safe=''), + ) + elif self.user: + auth = '{user}@'.format( + user=NotifyNtfy.quote(self.user, safe=''), + ) + + elif self.token: # NtfyAuth.TOKEN also + auth = '{token}@'.format( + token=self.pprint(self.token, privacy, safe=''), ) if self.mode == NtfyMode.PRIVATE: @@ -689,6 +757,37 @@ class NotifyNtfy(NotifyBase): results['targets'] += \ NotifyNtfy.parse_list(results['qsd']['to']) + # Token Specified + if 'token' in results['qsd'] and len(results['qsd']['token']): + # Token presumed to be the one in use + results['auth'] = NtfyAuth.TOKEN + results['token'] = NotifyNtfy.unquote(results['qsd']['token']) + + # Auth override + if 'auth' in results['qsd'] and results['qsd']['auth']: + results['auth'] = NotifyNtfy.unquote( + results['qsd']['auth'].strip().lower()) + + if not results.get('auth') and results['user'] \ + and not results['password']: + # We can try to detect the authentication type on the formatting of + # the username. Look for tk_.* + # + # This isn't a surfire way to do things though; it's best to + # specify the auth= flag + results['auth'] = NtfyAuth.TOKEN \ + if NTFY_AUTH_DETECT_RE.match(results['user']) \ + else NtfyAuth.BASIC + + if results.get('auth') == NtfyAuth.TOKEN and not results.get('token'): + if results['user'] and not results['password']: + # Make sure we properly set our token + results['token'] = NotifyNtfy.unquote(results['user']) + + elif results['password']: + # Make sure we properly set our token + results['token'] = NotifyNtfy.unquote(results['password']) + # Mode override if 'mode' in results['qsd'] and results['qsd']['mode']: results['mode'] = NotifyNtfy.unquote( diff --git a/test/test_plugin_ntfy.py b/test/test_plugin_ntfy.py index 51953ed0..34d68700 100644 --- a/test/test_plugin_ntfy.py +++ b/test/test_plugin_ntfy.py @@ -99,6 +99,8 @@ apprise_url_tests = ( ('ntfy://user@localhost/topic/', { 'instance': NotifyNtfy, 'requests_response_text': GOOD_RESPONSE_TEXT, + # Our expected url(privacy=True) startswith() response: + 'privacy_url': 'ntfy://user@localhost/topic', }), # Ntfy cloud mode (enforced) ('ntfy://ntfy.sh/topic1/topic2/', { @@ -165,10 +167,55 @@ apprise_url_tests = ( 'instance': NotifyNtfy, 'requests_response_text': GOOD_RESPONSE_TEXT, }), + # Auth Token Types (tk_ gets detected as a auth=token) + ('ntfy://tk_abcd123456@localhost/topic1', { + 'instance': NotifyNtfy, + 'requests_response_text': GOOD_RESPONSE_TEXT, + # Our expected url(privacy=True) startswith() response: + 'privacy_url': 'ntfy://t...6@localhost/topic1', + }), + # Force an auth token since lack of tk_ prevents auto-detection + ('ntfy://abcd123456@localhost/topic1?auth=token', { + 'instance': NotifyNtfy, + 'requests_response_text': GOOD_RESPONSE_TEXT, + # Our expected url(privacy=True) startswith() response: + 'privacy_url': 'ntfy://a...6@localhost/topic1', + }), + # Force an auth token since lack of tk_ prevents auto-detection + ('ntfy://:abcd123456@localhost/topic1?auth=token', { + 'instance': NotifyNtfy, + 'requests_response_text': GOOD_RESPONSE_TEXT, + # Our expected url(privacy=True) startswith() response: + 'privacy_url': 'ntfy://a...6@localhost/topic1', + }), + # Token detection already implied when token keyword is set + ('ntfy://localhost/topic1?token=abc1234', { + 'instance': NotifyNtfy, + 'requests_response_text': GOOD_RESPONSE_TEXT, + # Our expected url(privacy=True) startswith() response: + 'privacy_url': 'ntfy://a...4@localhost/topic1', + }), + # Token enforced, but since a user/pass provided, only the pass is kept + ('ntfy://user:token@localhost/topic1?auth=token', { + 'instance': NotifyNtfy, + 'requests_response_text': GOOD_RESPONSE_TEXT, + # Our expected url(privacy=True) startswith() response: + 'privacy_url': 'ntfy://t...n@localhost/topic1', + }), + # Token mode force, but there was no token provided + ('ntfy://localhost/topic1?auth=token', { + 'instance': NotifyNtfy, + # We'll out-right fail to send the notification + 'response': False, + # Our expected url(privacy=True) startswith() response: + 'privacy_url': 'ntfy://localhost/topic1', + }), # Priority ('ntfy://localhost/topic1/?priority=default', { 'instance': NotifyNtfy, 'requests_response_text': GOOD_RESPONSE_TEXT, + # Our expected url(privacy=True) startswith() response: + 'privacy_url': 'ntfy://localhost/topic1', }), # Priority higher ('ntfy://localhost/topic1/?priority=high', { @@ -213,6 +260,10 @@ apprise_url_tests = ( # Invalid mode 'instance': TypeError, }), + ('ntfys://token@localhost/topic/?auth=invalid', { + # Invalid Authentication type + 'instance': TypeError, + }), # Invalid hostname on localhost/private mode ('ntfys://user:web@-_/topic1/topic2/?mode=private', { 'instance': None,