From 20f27ace2ff7070c176d846d6923af8acb15f39f Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Sun, 8 Dec 2024 12:38:56 -0500 Subject: [PATCH 1/4] gmail oauth support initial commit --- apprise/plugins/gmail.py | 663 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 663 insertions(+) create mode 100644 apprise/plugins/gmail.py diff --git a/apprise/plugins/gmail.py b/apprise/plugins/gmail.py new file mode 100644 index 00000000..67d085da --- /dev/null +++ b/apprise/plugins/gmail.py @@ -0,0 +1,663 @@ +# -*- coding: utf-8 -*- +# BSD 2-Clause License +# +# Apprise - Push Notification Library. +# Copyright (c) 2024, Chris Caron +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +import requests +import base64 +import json +import time +from datetime import datetime +from datetime import timedelta +from .base import NotifyBase +from .. import exception +from ..url import PrivacyMode +from ..common import NotifyFormat +from ..common import NotifyType +from ..utils import is_email +from ..utils import parse_emails +from ..utils import validate_regex +from ..locale import gettext_lazy as _ +from ..common import PersistentStoreMode +from .email import NotifyEmail + + +class NotifyGMail(NotifyBase): + """ + A wrapper for GMail Notifications + """ + + # The default descriptive name associated with the Notification + service_name = 'GMail' + + # The services URL + service_url = 'https://mail.google.com/' + + # The default protocol + secure_protocol = 'gmail' + + # Allow 300 requests per minute. + # 60/300 = 0.2 + request_rate_per_sec = 0.20 + + # A URL that takes you to the setup/help of the specific protocol + setup_url = 'https://github.com/caronc/apprise/wiki/Notify_gmail' + + # Google OAuth2 URLs + auth_url = "https://oauth2.googleapis.com/device/code" + token_url = "https://oauth2.googleapis.com/token" + send_url = "https://gmail.googleapis.com/gmail/v1/users/me/messages/send" + + # The maximum number of seconds we will wait for our token to be acquired + token_acquisition_timeout = 6.0 + + # Required Scope + scope = "https://www.googleapis.com/auth/gmail.send" + + # Support attachments + attachment_support = True + + # Our default is to no not use persistent storage beyond in-memory + # reference + storage_mode = PersistentStoreMode.AUTO + + # Default Notify Format + notify_format = NotifyFormat.HTML + + # Define object templates + templates = ( + # Send as user (only supported method) + '{schema}://{user}@{client_id}/{secret}/{targets}', + ) + + # Define our template tokens + template_tokens = dict(NotifyBase.template_tokens, **{ + 'user': { + 'name': _('Username'), + 'type': 'string', + 'required': True, + }, + 'client_id': { + 'name': _('Client ID'), + 'type': 'string', + 'required': True, + 'private': True, + 'regex': (r'^[a-z0-9-]+$', 'i'), + }, + 'secret': { + 'name': _('Client Secret'), + 'type': 'string', + 'private': True, + 'required': True, + }, + 'target_email': { + 'name': _('Target Email'), + 'type': 'string', + 'map_to': 'targets', + }, + 'targets': { + 'name': _('Targets'), + 'type': 'list:string', + }, + }) + + # Define our template arguments + template_args = dict(NotifyBase.template_args, **{ + 'to': { + 'alias_of': 'targets', + }, + 'cc': { + 'name': _('Carbon Copy'), + 'type': 'list:string', + }, + 'bcc': { + 'name': _('Blind Carbon Copy'), + 'type': 'list:string', + }, + 'oauth_id': { + 'alias_of': 'client_id', + }, + 'oauth_secret': { + 'alias_of': 'secret', + }, + 'from': { + 'name': _('From Email'), + 'type': 'string', + 'map_to': 'from_addr', + }, + 'pgp': { + 'name': _('PGP Encryption'), + 'type': 'bool', + 'map_to': 'use_pgp', + 'default': False, + }, + 'pgpkey': { + 'name': _('PGP Public Key Path'), + 'type': 'string', + 'private': True, + # By default persistent storage is referenced + 'default': '', + 'map_to': 'pgp_key', + }, + }) + + # Define any kwargs we're using + template_kwargs = { + 'headers': { + 'name': _('Email Header'), + 'prefix': '+', + }, + } + + def __init__(self, client_id, secret, targets=None, cc=None, bcc=None, + from_addr=None, headers=None, use_pgp=None, pgp_key=None, + *kwargs): + """ + Initialize GMail Object + """ + super().__init__(**kwargs) + + # Client Key (associated with generated OAuth2 Login) + if not self.user: + msg = 'An invalid GMail User ' \ + '({}) was specified.'.format(self.user) + self.logger.warning(msg) + raise TypeError(msg) + + # Client Key (associated with generated OAuth2 Login) + self.client_id = validate_regex( + client_id, *self.template_tokens['client_id']['regex']) + if not self.client_id: + msg = 'An invalid GMail Client OAuth2 ID ' \ + '({}) was specified.'.format(client_id) + self.logger.warning(msg) + raise TypeError(msg) + + # Client Secret (associated with generated OAuth2 Login) + self.secret = validate_regex(secret) + if not self.secret: + msg = 'An invalid GMail Client OAuth2 Secret ' \ + '({}) was specified.'.format(secret) + self.logger.warning(msg) + raise TypeError(msg) + + # For tracking our email -> name lookups + self.names = {} + + self.headers = { + 'X-Application': self.app_id, + } + if headers: + # Store our extra headers + self.headers.update(headers) + + # Acquire Carbon Copies + self.cc = set() + + # Acquire Blind Carbon Copies + self.bcc = set() + + # Parse our targets + self.targets = list() + + for recipient in parse_emails(targets): + # Validate recipients (to:) and drop bad ones: + result = is_email(recipient) + if result: + # Add our email to our target list + self.targets.append( + (result['name'] if result['name'] else False, + result['full_email'])) + continue + + self.logger.warning( + 'Dropped invalid To email ({}) specified.' + .format(recipient)) + + # Validate recipients (cc:) and drop bad ones: + for recipient in parse_emails(cc): + email = is_email(recipient) + if email: + self.cc.add(email['full_email']) + + # Index our name (if one exists) + self.names[email['full_email']] = \ + email['name'] if email['name'] else False + continue + + self.logger.warning( + 'Dropped invalid Carbon Copy email ' + '({}) specified.'.format(recipient), + ) + + # Validate recipients (bcc:) and drop bad ones: + for recipient in parse_emails(bcc): + email = is_email(recipient) + if email: + self.bcc.add(email['full_email']) + + # Index our name (if one exists) + self.names[email['full_email']] = \ + email['name'] if email['name'] else False + continue + + self.logger.warning( + 'Dropped invalid Blind Carbon Copy email ' + '({}) specified.'.format(recipient), + ) + + # Our token is acquired upon a successful login + self.token = None + + # Presume that our token has expired 'now' + self.token_expiry = datetime.now() + + # Now we want to construct the To and From email + # addresses from the URL provided + self.from_addr = [False, ''] + + # pgp hash + self.pgp_public_keys = {} + + self.use_pgp = use_pgp if not None \ + else self.template_args['pgp']['default'] + + if from_addr: + result = is_email(from_addr) + if result: + self.from_addr = ( + result['name'] if result['name'] else False, + result['full_email']) + else: + # Only update the string but use the already detected info + self.from_addr[0] = from_addr + + else: # Default + self.from_addr[1] = f'{self.user}@gmail.com' + + result = is_email(self.from_addr[1]) + if not result: + # Parse Source domain based on from_addr + msg = 'Invalid ~From~ email specified: {}'.format( + '{} <{}>'.format(self.from_addr[0], self.from_addr[1]) + if self.from_addr[0] else '{}'.format(self.from_addr[1])) + self.logger.warning(msg) + raise TypeError(msg) + + # Store our lookup + self.names[self.from_addr[1]] = self.from_addr[0] + return + + def send(self, body, title='', notify_type=NotifyType.INFO, attach=None, + **kwargs): + """ + Perform GMail Notification + """ + + # error tracking (used for function return) + has_error = False + + if not self.targets: + # There is no one to email; we're done + self.logger.warning( + 'There are no Email recipients to notify') + return False + + try: + for message in NotifyEmail.prepare_emails( + subject=title, body=body, notify_format=self.notify_format, + from_addr=self.from_addr, to=self.targets, + cc=self.cc, bcc=self.bcc, reply_to=self.reply_to, + smtp_host=self.smtp_host, + attach=attach, headers=self.headers, names=self.names, + pgp=self.use_pgp, pgp_path='TODO'): + + # Encode the message in base64 + payload = { + "raw": base64.urlsafe_b64encode( + message.as_bytes()).decode() + } + + # Perform upstream post + postokay, response = self._fetch( + url=self.send_url, payload=payload) + if not postokay: + has_error = True + + except exception.AppriseException as e: + self.logger.debug(f'Socket Exception: {e}') + + # Mark as failure + has_error = True + + return not has_error + + def authenticate(self): + """ + Logs into and acquires us an authentication token to work with + """ + + if self.token and self.token_expiry > datetime.now(): + # If we're already authenticated and our token is still valid + self.logger.debug( + 'Already authenticate with token {}'.format(self.token)) + return True + + # If we reach here, we've either expired, or we need to authenticate + # for the first time. + + # Prepare our payload + payload = { + "client_id": self.client_id, + "scope": self.scope, + } + + postokay, response = self._fetch( + url=self.auth_url, payload=payload, + content_type='application/x-www-form-urlencoded') + if not postokay: + return False + + # Reset our token + self.token = None + + # A device token is required to get our token + device_code = None + + try: + # Extract our time from our response and subtrace 10 seconds from + # it to give us some wiggle/grace people to re-authenticate if we + # need to + self.token_expiry = datetime.now() + \ + timedelta(seconds=int(response.get('expires_in')) - 10) + + except (ValueError, AttributeError, TypeError): + # ValueError: expires_in wasn't an integer + # TypeError: expires_in was None + # AttributeError: we could not extract anything from our response + # object. + return False + + # Go ahead and store our token if it's available + device_code = response.get('device_code') + + payload = { + "client_id": self.client_id, + "client_secret": self.secret, + "device_code": device_code, + "grant_type": "urn:ietf:params:oauth:grant-type:device_code", + } + + self.logger.debug( + 'Blocking until GMail token can be acquired ...') + + reference = datetime.now() + + while True: + postokay, response = self._fetch( + url=self.token_url, payload=payload) + + if postokay: + self.token = response.get("access_token") + break + + if response and response.get("error") == "authorization_pending": + # Our own throttle so we can abort eventually.... + elapsed = (datetime.now() - reference).total_seconds() + if elapsed >= self.token_acquisition_timeout: + self.logger.warning( + 'The GMail token could not be acquired') + break + + time.sleep(0.5) + continue + + # We failed + break + + # Return our success (if we were at all) + return True if self.token else False + + def _fetch(self, url, payload=None, headers=None, + content_type='application/json'): + """ + Wrapper to request object + + """ + + # Prepare our headers: + if not headers: + headers = { + 'User-Agent': self.app_id, + 'Content-Type': content_type, + } + + if self.token: + # Are we authenticated? + headers['Authorization'] = 'Bearer ' + self.token + + # Default content response object + content = {} + + # Some Debug Logging + self.logger.debug('GMail URL: {} (cert_verify={})'.format( + url, self.verify_certificate)) + self.logger.debug('GMail Payload: {}' .format(payload)) + + # Always call throttle before any remote server i/o is made + self.throttle() + + try: + r = requests.post( + url, + data=json.dumps(payload) + if content_type and content_type.endswith('/json') + else payload, + headers=headers, + verify=self.verify_certificate, + timeout=self.request_timeout, + ) + + if r.status_code not in ( + requests.codes.ok, requests.codes.created, + requests.codes.accepted): + + # We had a problem + status_str = \ + NotifyGMail.http_response_code_lookup(r.status_code) + + self.logger.warning( + 'Failed to send GMail to {}: ' + '{}error={}.'.format( + url, + ', ' if status_str else '', + r.status_code)) + + self.logger.debug( + 'Response Details:\r\n{}'.format(r.content)) + + # Mark our failure + return (False, content) + + try: + content = json.loads(r.content) + + except (AttributeError, TypeError, ValueError): + # ValueError = r.content is Unparsable + # TypeError = r.content is None + # AttributeError = r is None + content = {} + + except requests.RequestException as e: + self.logger.warning( + 'Exception received when sending GMail to {}: '. + format(url)) + self.logger.debug('Socket Exception: %s' % str(e)) + + # Mark our failure + return (False, content) + + return (True, content) + + @property + def url_identifier(self): + """ + Returns all of the identifiers that make this URL unique from + another simliar one. Targets or end points should never be identified + here. + """ + return (self.secure_protocol, self.user, self.client_id, self.secret) + + def url(self, privacy=False, *args, **kwargs): + """ + Returns the URL built dynamically based on specified arguments. + """ + + # Extend our parameters + params = self.url_parameters(privacy=privacy, *args, **kwargs) + + if self.cc: + # Handle our Carbon Copy Addresses + params['cc'] = ','.join( + ['{}{}'.format( + '' if not self.names.get(e) + else '{}:'.format(self.names[e]), e) for e in self.cc]) + + if self.bcc: + # Handle our Blind Carbon Copy Addresses + params['bcc'] = ','.join( + ['{}{}'.format( + '' if not self.names.get(e) + else '{}:'.format(self.names[e]), e) for e in self.bcc]) + + return '{schema}://{user}@{client_id}/{secret}' \ + '/{targets}/?{params}'.format( + schema=self.secure_protocol, + user=self.user, + client_id=self.pprint(self.client_id, privacy, safe=''), + secret=self.pprint( + self.secret, privacy, mode=PrivacyMode.Secret, + safe=''), + targets='/'.join( + [NotifyGMail.quote('{}{}'.format( + '' if not e[0] else '{}:'.format(e[0]), e[1]), + safe='@') for e in self.targets]), + params=NotifyGMail.urlencode(params)) + + def __len__(self): + """ + Returns the number of targets associated with this notification + """ + return len(self.targets) + + @staticmethod + def parse_url(url): + """ + Parses the URL and returns enough arguments that can allow + us to re-instantiate this object. + + """ + + results = NotifyBase.parse_url(url, verify_host=False) + if not results: + # We're done early as we couldn't load the results + return results + + # Now make a list of all our path entries + # We need to read each entry back one at a time in reverse order + # where each email found we mark as a target. Once we run out + # of targets, the presume the remainder of the entries are part + # of the secret key (since it can contain slashes in it) + entries = NotifyGMail.split_path(results['fullpath']) + + # Initialize our email + results['email'] = None + + # From Email + if 'from' in results['qsd'] and \ + len(results['qsd']['from']): + # Extract the sending account's information + results['source'] = \ + NotifyGMail.unquote(results['qsd']['from']) + + # OAuth2 ID + if 'oauth_id' in results['qsd'] and len(results['qsd']['oauth_id']): + # Extract the API Key from an argument + results['client_id'] = \ + NotifyGMail.unquote(results['qsd']['oauth_id']) + + elif entries: + # Get our client_id is the first entry on the path + results['client_id'] = NotifyGMail.unquote(entries.pop(0)) + + # + # Prepare our target listing + # + results['targets'] = list() + while entries: + # Pop the last entry + entry = NotifyGMail.unquote(entries.pop(-1)) + + if is_email(entry): + # Store our email and move on + results['targets'].append(entry) + continue + + # If we reach here, the entry we just popped is part of the secret + # key, so put it back + entries.append(NotifyGMail.quote(entry, safe='')) + + # We're done + break + + # OAuth2 Secret + if 'oauth_secret' in results['qsd'] and \ + len(results['qsd']['oauth_secret']): + # Extract the API Secret from an argument + results['secret'] = \ + NotifyGMail.unquote(results['qsd']['oauth_secret']) + + else: + # Assemble our secret key which is a combination of the host + # followed by all entries in the full path that follow up until + # the first email + results['secret'] = '/'.join( + [NotifyGMail.unquote(x) for x in entries]) + + # Support the 'to' variable so that we can support targets this way too + # The 'to' makes it easier to use yaml configuration + if 'to' in results['qsd'] and len(results['qsd']['to']): + results['targets'] += \ + NotifyGMail.parse_list(results['qsd']['to']) + + # Handle Carbon Copy Addresses + if 'cc' in results['qsd'] and len(results['qsd']['cc']): + results['cc'] = results['qsd']['cc'] + + # Handle Blind Carbon Copy Addresses + if 'bcc' in results['qsd'] and len(results['qsd']['bcc']): + results['bcc'] = results['qsd']['bcc'] + + return results From 76158bd747613ef106480513b9ef3628e7bd2879 Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Sat, 14 Dec 2024 23:14:32 -0500 Subject: [PATCH 2/4] gmail support attempt started; email refactored --- apprise/logger.py | 13 ++ apprise/plugins/email/pgp.py | 354 +++++++++++++++++++++++++++++++ apprise/plugins/gmail.py | 392 +++++++++++++++++++++++++---------- 3 files changed, 652 insertions(+), 107 deletions(-) create mode 100644 apprise/plugins/email/pgp.py diff --git a/apprise/logger.py b/apprise/logger.py index d9efe47c..ede7b1b6 100644 --- a/apprise/logger.py +++ b/apprise/logger.py @@ -40,7 +40,11 @@ logging.TRACE = logging.DEBUG - 1 # from the command line. The idea here is to allow for deprecation notices logging.DEPRECATE = logging.ERROR + 1 +# Action Required Notices +logging.ACTION_REQUIRED = logging.ERROR + 2 + # Assign our Levels into our logging object +logging.addLevelName(logging.ACTION_REQUIRED, "ACTION REQUIRED") logging.addLevelName(logging.DEPRECATE, "DEPRECATION WARNING") logging.addLevelName(logging.TRACE, "TRACE") @@ -61,9 +65,18 @@ def deprecate(self, message, *args, **kwargs): self._log(logging.DEPRECATE, message, args, **kwargs) +def action_required(self, message, *args, **kwargs): + """ + Action Required Logging + """ + if self.isEnabledFor(logging.ACTION_REQUIRED): + self._log(logging.ACTION_REQUIRED, message, args, **kwargs) + + # Assign our Loggers for use in Apprise logging.Logger.trace = trace logging.Logger.deprecate = deprecate +logging.Logger.action_required = action_required # Create ourselve a generic (singleton) logging reference logger = logging.getLogger(LOGGER_NAME) diff --git a/apprise/plugins/email/pgp.py b/apprise/plugins/email/pgp.py new file mode 100644 index 00000000..fa446664 --- /dev/null +++ b/apprise/plugins/email/pgp.py @@ -0,0 +1,354 @@ +# -*- coding: utf-8 -*- +# BSD 2-Clause License +# +# Apprise - Push Notification Library. +# Copyright (c) 2024, Chris Caron +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +import hashlib +from datetime import datetime +from datetime import timedelta +from datetime import timezone + +from ...asset import AppriseAsset +from ...apprise_attachment import AppriseAttachment +from ...logger import logger +from ...exception import ApprisePluginException + +try: + import pgpy + # Pretty Good Privacy (PGP) Support enabled + PGP_SUPPORT = True + +except ImportError: + # Pretty Good Privacy (PGP) Support disabled + PGP_SUPPORT = False + + +class ApprisePGPException(ApprisePluginException): + """ + Thrown when there is an error with the Pretty Good Privacy Controller + """ + def __init__(self, message, error_code=602): + super().__init__(message, error_code=error_code) + + +class ApprisePGPController: + """ + Pretty Good Privacy Controller Tool for the Apprise Library + """ + + # There is no reason a PGP Public Key should exceed 8K in size + # If it is more than this, then it is not accepted + max_pgp_public_key_size = 8000 + + def __init__(self, path, pub_keyfile=None, email=None, asset=None, + **kwargs): + """ + Path should be the directory keys can be written and read from such as + .store.path + + Optionally additionally specify a keyfile to explicitly open + """ + + # PGP hash + self.__key_lookup = {} + + # Directory we can work with + self.path = path + + # Our email + self.email = email + + # Prepare our Asset Object + self.asset = \ + asset if isinstance(asset, AppriseAsset) else AppriseAsset() + + if pub_keyfile: + # Create ourselves an Attachment to work with; this grants us the + # ability to pull this key from a remote site or anything else + # supported by the Attachment object + self._pub_keyfile = AppriseAttachment(asset=self.asset) + + # Add our definition to our pgp_key reference + self._pub_keyfile.add(pub_keyfile) + + # Enforce maximum file size + self._pub_keyfile[0].max_file_size = self.max_pgp_public_key_size + + else: + self._pub_keyfile = None + + def keygen(self, email=None, name=None, force=False): + """ + Generates a set of keys based on email configured. + """ + + try: + # Create a new RSA key pair with 2048-bit strength + key = pgpy.PGPKey.new( + pgpy.constants.PubKeyAlgorithm.RSAEncryptOrSign, 2048) + + except NameError: + # PGPy not installed + logger.debug('PGPy not installed; keygen disabled') + return False + + if self._pub_keyfile is not None or not self.path: + logger.trace( + 'PGP keygen disabled, reason=%s', + 'keyfile-defined' if self._pub_keyfile is not None + else 'no-write-path') + return False + + if not name: + name = self.asset.app_id + + if not email: + email = self.email + + # Prepare our UID + uid = pgpy.PGPUID.new(name, email=email) + + # Filenames + file_prefix = email.split('@')[0].lower() + + pub_path = os.path.join(self.path, f'{file_prefix}-pub.asc') + prv_path = os.path.join(self.path, f'{file_prefix}-prv.asc') + + if os.path.isfile(pub_path) and not force: + logger.debug( + 'PGP generation skipped; Public Key already exists: %s', + pub_path) + return True + + # Persistent Storage Key + lookup_key = hashlib.sha1( + os.path.abspath(pub_path).encode('utf-8')).hexdigest() + if lookup_key in self.__key_lookup: + # Ensure our key no longer exists + del self.__key_lookup[lookup_key] + + # Add the user ID to the key + key.add_uid(uid, usage={ + pgpy.constants.KeyFlags.Sign, + pgpy.constants.KeyFlags.EncryptCommunications}, + hashes=[pgpy.constants.HashAlgorithm.SHA256], + ciphers=[pgpy.constants.SymmetricKeyAlgorithm.AES256], + compression=[pgpy.constants.CompressionAlgorithm.ZLIB]) + + try: + # Write our keys to disk + with open(pub_path, 'w') as f: + f.write(str(key.pubkey)) + + except OSError as e: + logger.warning('Error writing PGP file %s', pub_path) + logger.debug(f'I/O Exception: {e}') + + # Cleanup + try: + os.unlink(pub_path) + logger.trace('Removed %s', pub_path) + + except OSError: + pass + + try: + with open(prv_path, 'w') as f: + f.write(str(key)) + + except OSError as e: + logger.warning('Error writing PGP file %s', prv_path) + logger.debug(f'I/O Exception: {e}') + + try: + os.unlink(pub_path) + logger.trace('Removed %s', pub_path) + + except OSError: + pass + + try: + os.unlink(prv_path) + logger.trace('Removed %s', prv_path) + + except OSError: + pass + + return False + + logger.info( + 'Wrote PGP Keys for %s/%s', + os.path.dirname(pub_path), + os.path.basename(pub_path)) + return True + + def public_keyfile(self, *emails): + """ + Returns the first match of a useable public key based emails provided + """ + + if not PGP_SUPPORT: + msg = 'PGP Support unavailable; install PGPy library' + logger.warning(msg) + raise ApprisePGPException(msg) + + if self._pub_keyfile is not None: + # If our code reaches here, then we fetch our public key + pgp_key = self._pub_keyfile[0] + if not pgp_key: + # We could not access the attachment + logger.error( + 'Could not access PGP Public Key {}.'.format( + pgp_key.url(privacy=True))) + return False + + return pgp_key.path + + elif not self.path: + # No path + return None + + fnames = [ + 'pgp-public.asc', + 'pgp-pub.asc', + 'public.asc', + 'pub.asc', + ] + + if self.email: + # Include our email in the list + emails = [self.email] + [*emails] + + for email in emails: + _entry = email.split('@')[0].lower() + fnames.insert(0, f'{_entry}-pub.asc') + + # Lowercase email (Highest Priority) + _entry = email.lower() + fnames.insert(0, f'{_entry}-pub.asc') + + return next( + (os.path.join(self.path, fname) + for fname in fnames + if os.path.isfile(os.path.join(self.path, fname))), + None) + + def public_key(self, *emails, autogen=None): + """ + Opens a spcified pgp public file and returns the key from it which + is used to encrypt the message + """ + path = self.public_keyfile(*emails) + if not path: + if (autogen if autogen is not None else self.asset.pgp_autogen) \ + and self.keygen(*emails): + path = self.public_keyfile(*emails) + if path: + # We should get a hit now + return self.public_key(*emails) + + logger.warning('No PGP Public Key could be loaded') + return None + + # Persistent Storage Key + key = hashlib.sha1( + os.path.abspath(path).encode('utf-8')).hexdigest() + if key in self.__key_lookup: + # Take an early exit + return self.__key_lookup[key]['public_key'] + + try: + with open(path, 'r') as key_file: + public_key, _ = pgpy.PGPKey.from_blob(key_file.read()) + + except NameError: + # PGPy not installed + logger.debug( + 'PGPy not installed; skipping PGP support: %s', path) + return None + + except FileNotFoundError: + # Generate keys + logger.debug('PGP Public Key file not found: %s', path) + return None + + except OSError as e: + logger.warning('Error accessing PGP Public Key file %s', path) + logger.debug(f'I/O Exception: {e}') + return None + + self.__key_lookup[key] = { + 'public_key': public_key, + 'expires': + datetime.now(timezone.utc) + timedelta(seconds=86400) + } + return public_key + + # Encrypt message using the recipient's public key + def encrypt(self, message, *emails): + """ + If provided a path to a pgp-key, content is encrypted + """ + + # Acquire our key + public_key = self.public_key(*emails) + if not public_key: + # Encryption not possible + return False + + try: + message_object = pgpy.PGPMessage.new(message) + encrypted_message = public_key.encrypt(message_object) + return str(encrypted_message) + + except pgpy.errors.PGPError: + # Encryption not Possible + logger.debug( + 'PGP Public Key Corruption; encryption not possible') + + except NameError: + # PGPy not installed + logger.debug('PGPy not installed; Skipping PGP encryption') + + return None + + def prune(self): + """ + Prunes old entries from the public_key index + """ + self.__key_lookup = { + key: value for key, value in self.__key_lookup.items() + if value['expires'] > datetime.now(timezone.utc)} + + @property + def pub_keyfile(self): + """ + Returns the Public Keyfile Path if set otherwise it returns None + This property returns False if a keyfile was provided, but was invalid + """ + return None if self._pub_keyfile is None else ( + False if not self._pub_keyfile[0] else self._pub_keyfile[0].path) diff --git a/apprise/plugins/gmail.py b/apprise/plugins/gmail.py index 67d085da..521d3066 100644 --- a/apprise/plugins/gmail.py +++ b/apprise/plugins/gmail.py @@ -33,6 +33,7 @@ from datetime import datetime from datetime import timedelta from .base import NotifyBase from .. import exception +from email.utils import formataddr from ..url import PrivacyMode from ..common import NotifyFormat from ..common import NotifyType @@ -41,7 +42,7 @@ from ..utils import parse_emails from ..utils import validate_regex from ..locale import gettext_lazy as _ from ..common import PersistentStoreMode -from .email import NotifyEmail +from . import email as _email class NotifyGMail(NotifyBase): @@ -58,6 +59,9 @@ class NotifyGMail(NotifyBase): # The default protocol secure_protocol = 'gmail' + # GMail SMTP Host (used for generating a Message-ID) + google_smtp_host = 'smtp.gmail.com' + # Allow 300 requests per minute. # 60/300 = 0.2 request_rate_per_sec = 0.20 @@ -66,12 +70,12 @@ class NotifyGMail(NotifyBase): setup_url = 'https://github.com/caronc/apprise/wiki/Notify_gmail' # Google OAuth2 URLs - auth_url = "https://oauth2.googleapis.com/device/code" + device_url = "https://oauth2.googleapis.com/device/code" token_url = "https://oauth2.googleapis.com/token" send_url = "https://gmail.googleapis.com/gmail/v1/users/me/messages/send" # The maximum number of seconds we will wait for our token to be acquired - token_acquisition_timeout = 6.0 + token_acquisition_timeout = 14.0 # Required Scope scope = "https://www.googleapis.com/auth/gmail.send" @@ -88,7 +92,7 @@ class NotifyGMail(NotifyBase): # Define object templates templates = ( - # Send as user (only supported method) + '{schema}://{user}@{client_id}/{secret}', '{schema}://{user}@{client_id}/{secret}/{targets}', ) @@ -104,7 +108,9 @@ class NotifyGMail(NotifyBase): 'type': 'string', 'required': True, 'private': True, - 'regex': (r'^[a-z0-9-]+$', 'i'), + # Generally looks like: + # 12345012-xxxxxxxxxxxxxxxxxxxxxxxxxxxx.apps.googleusercontent.com + 'regex': (r'^[a-z0-9-.]+$', 'i'), }, 'secret': { 'name': _('Client Secret'), @@ -136,10 +142,10 @@ class NotifyGMail(NotifyBase): 'name': _('Blind Carbon Copy'), 'type': 'list:string', }, - 'oauth_id': { + 'client_id': { 'alias_of': 'client_id', }, - 'oauth_secret': { + 'secret': { 'alias_of': 'secret', }, 'from': { @@ -161,6 +167,11 @@ class NotifyGMail(NotifyBase): 'default': '', 'map_to': 'pgp_key', }, + 'reply': { + 'name': _('Reply To'), + 'type': 'list:string', + 'map_to': 'reply_to', + }, }) # Define any kwargs we're using @@ -171,9 +182,9 @@ class NotifyGMail(NotifyBase): }, } - def __init__(self, client_id, secret, targets=None, cc=None, bcc=None, - from_addr=None, headers=None, use_pgp=None, pgp_key=None, - *kwargs): + def __init__(self, client_id, secret, targets=None, from_addr=None, + cc=None, bcc=None, reply_to=None, headers=None, + use_pgp=None, pgp_key=None, **kwargs): """ Initialize GMail Object """ @@ -206,9 +217,8 @@ class NotifyGMail(NotifyBase): # For tracking our email -> name lookups self.names = {} - self.headers = { - 'X-Application': self.app_id, - } + # Save our headers + self.headers = {} if headers: # Store our extra headers self.headers.update(headers) @@ -219,23 +229,12 @@ class NotifyGMail(NotifyBase): # Acquire Blind Carbon Copies self.bcc = set() + # Acquire Reply To + self.reply_to = set() + # Parse our targets self.targets = list() - for recipient in parse_emails(targets): - # Validate recipients (to:) and drop bad ones: - result = is_email(recipient) - if result: - # Add our email to our target list - self.targets.append( - (result['name'] if result['name'] else False, - result['full_email'])) - continue - - self.logger.warning( - 'Dropped invalid To email ({}) specified.' - .format(recipient)) - # Validate recipients (cc:) and drop bad ones: for recipient in parse_emails(cc): email = is_email(recipient) @@ -268,8 +267,25 @@ class NotifyGMail(NotifyBase): '({}) specified.'.format(recipient), ) + # Validate recipients (reply-to:) and drop bad ones: + for recipient in parse_emails(reply_to): + email = is_email(recipient) + if email: + self.reply_to.add(email['full_email']) + + # Index our name (if one exists) + self.names[email['full_email']] = \ + email['name'] if email['name'] else False + continue + + self.logger.warning( + 'Dropped invalid Reply To email ' + '({}) specified.'.format(recipient), + ) + # Our token is acquired upon a successful login self.token = None + self.refresh = None # Presume that our token has expired 'now' self.token_expiry = datetime.now() @@ -278,12 +294,6 @@ class NotifyGMail(NotifyBase): # addresses from the URL provided self.from_addr = [False, ''] - # pgp hash - self.pgp_public_keys = {} - - self.use_pgp = use_pgp if not None \ - else self.template_args['pgp']['default'] - if from_addr: result = is_email(from_addr) if result: @@ -294,7 +304,7 @@ class NotifyGMail(NotifyBase): # Only update the string but use the already detected info self.from_addr[0] = from_addr - else: # Default + else: # Send email to ourselves by default self.from_addr[1] = f'{self.user}@gmail.com' result = is_email(self.from_addr[1]) @@ -308,6 +318,40 @@ class NotifyGMail(NotifyBase): # Store our lookup self.names[self.from_addr[1]] = self.from_addr[0] + + if targets: + for recipient in parse_emails(targets): + # Validate recipients (to:) and drop bad ones: + result = is_email(recipient) + if result: + # Add our email to our target list + self.targets.append( + (result['name'] if result['name'] else False, + result['full_email'])) + continue + + self.logger.warning( + 'Dropped invalid To email ({}) specified.' + .format(recipient)) + else: + self.targets.append((False, self.from_addr[1])) + + # Prepare our Pretty Good Privacy Object + self.pgp = _email.pgp.ApprisePGPController( + path=self.store.path, pub_keyfile=pgp_key, + email=self.from_addr[1], asset=self.asset) + + # We store so we can generate a URL later on + self.pgp_key = pgp_key + + self.use_pgp = use_pgp if not None \ + else self.template_args['pgp']['default'] + + if self.use_pgp and not email.pgp.PGP_SUPPORT: + self.logger.warning( + 'PGP Support is not available on this installation; ' + 'ask admin to install PGPy') + return def send(self, body, title='', notify_type=NotifyType.INFO, attach=None, @@ -325,19 +369,29 @@ class NotifyGMail(NotifyBase): 'There are no Email recipients to notify') return False + if not self.authenticate(): + self.logger.warning('Could not authenticate with the GMail') + return False + + # Prepare our headers + headers = { + 'X-Application': self.app_id, + } + headers.update(self.headers) + try: - for message in NotifyEmail.prepare_emails( + for message in _email.NotifyEmail.prepare_emails( subject=title, body=body, notify_format=self.notify_format, from_addr=self.from_addr, to=self.targets, cc=self.cc, bcc=self.bcc, reply_to=self.reply_to, - smtp_host=self.smtp_host, - attach=attach, headers=self.headers, names=self.names, - pgp=self.use_pgp, pgp_path='TODO'): + smtp_host=self.google_smtp_host, + attach=attach, headers=headers, names=self.names, + pgp=self.pgp): # Encode the message in base64 payload = { "raw": base64.urlsafe_b64encode( - message.as_bytes()).decode() + message.body.encode()).decode() } # Perform upstream post @@ -354,7 +408,28 @@ class NotifyGMail(NotifyBase): return not has_error - def authenticate(self): + # def authenticate(self): + # """ + # JWT Authentication + # """ + + # iat = time.time() + # exp = iat + 3600 # Token valid for 1 hour + + # payload = { + # # Issuer (service account email) + # "iss": self.from_addr[1], + # # Scopes for Gmail API + # "scope": self.scope, + # # Audience (token endpoint) + # "aud": self.token_url, + # # Expiration time + # "exp": exp, + # # Issued at time + # "iat": iat + # } + + def authenticate(self, timeout=None, long_poll=5.0, short_poll=2.0): """ Logs into and acquires us an authentication token to work with """ @@ -365,8 +440,79 @@ class NotifyGMail(NotifyBase): 'Already authenticate with token {}'.format(self.token)) return True + if not timeout: + # Save our default timeout + timeout = self.token_acquisition_timeout + + def token_store(response, save=True): + """ + Stores token data + """ + try: + # Extract our time from our response and subtrace 10 + # seconds from it to give us some wiggle/grace people to + # re-authenticate if we need to + self.token_expiry = datetime.now() + \ + timedelta(seconds=int(response.get('expires_in')) - 10) + + except (ValueError, AttributeError, TypeError): + # ValueError: expires_in wasn't an integer + # TypeError: expires_in was None + # AttributeError: we could not extract anything from our + # response object. + return False + + if save: + # store our content to disk + self.store.write( + json.dumps(response).encode('utf-8'), key='tokens') + + # Store our other tokens for fast access + self.token = response.get("access_token") + self.refresh = response.get("refresh_token") + return True + + # Read our content to see if it exists + try: + response = json.loads( + self.store.read(key='tokens').decode('utf-8')) + + except AttributeError: + # NoneType returned; nothing to decode. + response = None + + if response and token_store(response, save=False) and self.refresh: + if self.token_expiry > (datetime.now() - timedelta(days=20)): + # + # We have to refresh our token + # + payload = { + "client_id": self.client_id, + "client_secret": self.secret, + "refresh_token": self.refresh_token, + "grant_type": "refresh_token", + } + + postokay, response = self._fetch( + url=self.token_url, payload=payload) + if postokay and token_store(response): + # We were successful + return True + + elif self.token: + # we're good with the information we have + return True + + # # If we reach here, we've either expired, or we need to authenticate # for the first time. + # + # Reset our token + self.token = None + self.refresh = None + + # Reset our token cache file + self.store.delete('tokens') # Prepare our payload payload = { @@ -375,31 +521,14 @@ class NotifyGMail(NotifyBase): } postokay, response = self._fetch( - url=self.auth_url, payload=payload, - content_type='application/x-www-form-urlencoded') + url=self.device_url, payload=payload, + content_type=None) if not postokay: return False - # Reset our token - self.token = None - # A device token is required to get our token device_code = None - try: - # Extract our time from our response and subtrace 10 seconds from - # it to give us some wiggle/grace people to re-authenticate if we - # need to - self.token_expiry = datetime.now() + \ - timedelta(seconds=int(response.get('expires_in')) - 10) - - except (ValueError, AttributeError, TypeError): - # ValueError: expires_in wasn't an integer - # TypeError: expires_in was None - # AttributeError: we could not extract anything from our response - # object. - return False - # Go ahead and store our token if it's available device_code = response.get('device_code') @@ -419,19 +548,32 @@ class NotifyGMail(NotifyBase): postokay, response = self._fetch( url=self.token_url, payload=payload) - if postokay: - self.token = response.get("access_token") + if postokay and token_store(response): + # We were successful break - if response and response.get("error") == "authorization_pending": + if response and response.get("error") in ( + "authorization_pending", "slow_down"): + # Our own throttle so we can abort eventually.... elapsed = (datetime.now() - reference).total_seconds() - if elapsed >= self.token_acquisition_timeout: - self.logger.warning( - 'The GMail token could not be acquired') + remaining = \ + 0.0 if (timeout - elapsed) < 0.0 else (timeout - elapsed) + self.logger.action_required( + f"Visit \"{response['verification_url']}\" " + f"and enter code: {response['user_code']} " + f"- [remaining={remaining:.2f}sec]") + + if elapsed >= timeout: + self.logger.warning('GMail token could not be acquired') break - time.sleep(0.5) + # Throttle + time.sleep( + short_poll if response.get("error") != "slow_down" + else long_poll) + + # Loop and see if we were successful continue # We failed @@ -451,9 +593,13 @@ class NotifyGMail(NotifyBase): if not headers: headers = { 'User-Agent': self.app_id, - 'Content-Type': content_type, } + if content_type: + headers.update({ + 'Content-Type': content_type, + }) + if self.token: # Are we authenticated? headers['Authorization'] = 'Bearer ' + self.token @@ -480,6 +626,15 @@ class NotifyGMail(NotifyBase): timeout=self.request_timeout, ) + try: + content = json.loads(r.content) + + except (AttributeError, TypeError, ValueError): + # ValueError = r.content is Unparsable + # TypeError = r.content is None + # AttributeError = r is None + content = {} + if r.status_code not in ( requests.codes.ok, requests.codes.created, requests.codes.accepted): @@ -501,15 +656,6 @@ class NotifyGMail(NotifyBase): # Mark our failure return (False, content) - try: - content = json.loads(r.content) - - except (AttributeError, TypeError, ValueError): - # ValueError = r.content is Unparsable - # TypeError = r.content is None - # AttributeError = r is None - content = {} - except requests.RequestException as e: self.logger.warning( 'Exception received when sending GMail to {}: '. @@ -528,29 +674,58 @@ class NotifyGMail(NotifyBase): another simliar one. Targets or end points should never be identified here. """ - return (self.secure_protocol, self.user, self.client_id, self.secret) + return (self.secure_protocol, self.user, self.client_id, + self.secret) def url(self, privacy=False, *args, **kwargs): """ Returns the URL built dynamically based on specified arguments. """ + # Define an URL parameters + params = { + 'pgp': 'yes' if self.use_pgp else 'no', + } + + # Store our public key back into your URL + if self.pgp_key is not None: + params['pgp_key'] = NotifyGMail.quote(self.pgp_key, safe=':\\/') + + # Append our headers into our parameters + params.update({'+{}'.format(k): v for k, v in self.headers.items()}) + # Extend our parameters - params = self.url_parameters(privacy=privacy, *args, **kwargs) + params.update(self.url_parameters(privacy=privacy, *args, **kwargs)) if self.cc: # Handle our Carbon Copy Addresses - params['cc'] = ','.join( - ['{}{}'.format( - '' if not self.names.get(e) - else '{}:'.format(self.names[e]), e) for e in self.cc]) + params['cc'] = ','.join([ + formataddr( + (self.names[e] if e in self.names else False, e), + # Swap comma for it's escaped url code (if detected) since + # we're using that as a delimiter + charset='utf-8').replace(',', '%2C') + for e in self.cc]) if self.bcc: # Handle our Blind Carbon Copy Addresses - params['bcc'] = ','.join( - ['{}{}'.format( - '' if not self.names.get(e) - else '{}:'.format(self.names[e]), e) for e in self.bcc]) + params['bcc'] = ','.join([ + formataddr( + (self.names[e] if e in self.names else False, e), + # Swap comma for it's escaped url code (if detected) since + # we're using that as a delimiter + charset='utf-8').replace(',', '%2C') + for e in self.bcc]) + + if self.reply_to: + # Handle our Reply-To Addresses + params['reply'] = ','.join([ + formataddr( + (self.names[e] if e in self.names else False, e), + # Swap comma for it's escaped url code (if detected) since + # we're using that as a delimiter + charset='utf-8').replace(',', '%2C') + for e in self.reply_to]) return '{schema}://{user}@{client_id}/{secret}' \ '/{targets}/?{params}'.format( @@ -591,6 +766,7 @@ class NotifyGMail(NotifyBase): # of targets, the presume the remainder of the entries are part # of the secret key (since it can contain slashes in it) entries = NotifyGMail.split_path(results['fullpath']) + entries.insert(0, NotifyGMail.unquote(results['host'])) # Initialize our email results['email'] = None @@ -603,22 +779,32 @@ class NotifyGMail(NotifyBase): NotifyGMail.unquote(results['qsd']['from']) # OAuth2 ID - if 'oauth_id' in results['qsd'] and len(results['qsd']['oauth_id']): + if 'client_id' in results['qsd'] and len(results['qsd']['client_id']): # Extract the API Key from an argument results['client_id'] = \ - NotifyGMail.unquote(results['qsd']['oauth_id']) + NotifyGMail.unquote(results['qsd']['client_id']) elif entries: # Get our client_id is the first entry on the path results['client_id'] = NotifyGMail.unquote(entries.pop(0)) + # OAuth2 Secret + if 'secret' in results['qsd'] and len(results['qsd']['secret']): + # Extract the API Key from an argument + results['secret'] = \ + NotifyGMail.unquote(results['qsd']['secret']) + + elif entries: + # Get our secret is the next entry on the path + results['secret'] = NotifyGMail.unquote(entries.pop(0)) + # # Prepare our target listing # results['targets'] = list() while entries: - # Pop the last entry - entry = NotifyGMail.unquote(entries.pop(-1)) + # Pop our remaining entries + entry = NotifyGMail.unquote(entries.pop()) if is_email(entry): # Store our email and move on @@ -629,23 +815,6 @@ class NotifyGMail(NotifyBase): # key, so put it back entries.append(NotifyGMail.quote(entry, safe='')) - # We're done - break - - # OAuth2 Secret - if 'oauth_secret' in results['qsd'] and \ - len(results['qsd']['oauth_secret']): - # Extract the API Secret from an argument - results['secret'] = \ - NotifyGMail.unquote(results['qsd']['oauth_secret']) - - else: - # Assemble our secret key which is a combination of the host - # followed by all entries in the full path that follow up until - # the first email - results['secret'] = '/'.join( - [NotifyGMail.unquote(x) for x in entries]) - # Support the 'to' variable so that we can support targets this way too # The 'to' makes it easier to use yaml configuration if 'to' in results['qsd'] and len(results['qsd']['to']): @@ -660,4 +829,13 @@ class NotifyGMail(NotifyBase): if 'bcc' in results['qsd'] and len(results['qsd']['bcc']): results['bcc'] = results['qsd']['bcc'] + # Handle Reply To Addresses + if 'reply' in results['qsd'] and len(results['qsd']['reply']): + results['reply_to'] = results['qsd']['reply'] + + # Add our Meta Headers that the user can provide with their outbound + # emails + results['headers'] = {NotifyBase.unquote(x): NotifyBase.unquote(y) + for x, y in results['qsd+'].items()} + return results From d508c934063e6c563307e608610742219ce39d6f Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Sun, 15 Dec 2024 11:22:14 -0500 Subject: [PATCH 3/4] slight update to logging --- apprise/plugins/gmail.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/apprise/plugins/gmail.py b/apprise/plugins/gmail.py index 521d3066..2f540537 100644 --- a/apprise/plugins/gmail.py +++ b/apprise/plugins/gmail.py @@ -639,16 +639,9 @@ class NotifyGMail(NotifyBase): requests.codes.ok, requests.codes.created, requests.codes.accepted): - # We had a problem - status_str = \ - NotifyGMail.http_response_code_lookup(r.status_code) - self.logger.warning( - 'Failed to send GMail to {}: ' - '{}error={}.'.format( - url, - ', ' if status_str else '', - r.status_code)) + 'Failed to send GMail to %s [error=%d]', + url, r.status_code) self.logger.debug( 'Response Details:\r\n{}'.format(r.content)) From 10dcb0bfa02f16234da435efac30b6a2892b3b43 Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Sun, 15 Dec 2024 13:12:06 -0500 Subject: [PATCH 4/4] subtle changes --- apprise/plugins/email/pgp.py | 354 ----------------------------------- apprise/plugins/gmail.py | 4 +- 2 files changed, 1 insertion(+), 357 deletions(-) delete mode 100644 apprise/plugins/email/pgp.py diff --git a/apprise/plugins/email/pgp.py b/apprise/plugins/email/pgp.py deleted file mode 100644 index fa446664..00000000 --- a/apprise/plugins/email/pgp.py +++ /dev/null @@ -1,354 +0,0 @@ -# -*- coding: utf-8 -*- -# BSD 2-Clause License -# -# Apprise - Push Notification Library. -# Copyright (c) 2024, Chris Caron -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, -# this list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. - -import os -import hashlib -from datetime import datetime -from datetime import timedelta -from datetime import timezone - -from ...asset import AppriseAsset -from ...apprise_attachment import AppriseAttachment -from ...logger import logger -from ...exception import ApprisePluginException - -try: - import pgpy - # Pretty Good Privacy (PGP) Support enabled - PGP_SUPPORT = True - -except ImportError: - # Pretty Good Privacy (PGP) Support disabled - PGP_SUPPORT = False - - -class ApprisePGPException(ApprisePluginException): - """ - Thrown when there is an error with the Pretty Good Privacy Controller - """ - def __init__(self, message, error_code=602): - super().__init__(message, error_code=error_code) - - -class ApprisePGPController: - """ - Pretty Good Privacy Controller Tool for the Apprise Library - """ - - # There is no reason a PGP Public Key should exceed 8K in size - # If it is more than this, then it is not accepted - max_pgp_public_key_size = 8000 - - def __init__(self, path, pub_keyfile=None, email=None, asset=None, - **kwargs): - """ - Path should be the directory keys can be written and read from such as - .store.path - - Optionally additionally specify a keyfile to explicitly open - """ - - # PGP hash - self.__key_lookup = {} - - # Directory we can work with - self.path = path - - # Our email - self.email = email - - # Prepare our Asset Object - self.asset = \ - asset if isinstance(asset, AppriseAsset) else AppriseAsset() - - if pub_keyfile: - # Create ourselves an Attachment to work with; this grants us the - # ability to pull this key from a remote site or anything else - # supported by the Attachment object - self._pub_keyfile = AppriseAttachment(asset=self.asset) - - # Add our definition to our pgp_key reference - self._pub_keyfile.add(pub_keyfile) - - # Enforce maximum file size - self._pub_keyfile[0].max_file_size = self.max_pgp_public_key_size - - else: - self._pub_keyfile = None - - def keygen(self, email=None, name=None, force=False): - """ - Generates a set of keys based on email configured. - """ - - try: - # Create a new RSA key pair with 2048-bit strength - key = pgpy.PGPKey.new( - pgpy.constants.PubKeyAlgorithm.RSAEncryptOrSign, 2048) - - except NameError: - # PGPy not installed - logger.debug('PGPy not installed; keygen disabled') - return False - - if self._pub_keyfile is not None or not self.path: - logger.trace( - 'PGP keygen disabled, reason=%s', - 'keyfile-defined' if self._pub_keyfile is not None - else 'no-write-path') - return False - - if not name: - name = self.asset.app_id - - if not email: - email = self.email - - # Prepare our UID - uid = pgpy.PGPUID.new(name, email=email) - - # Filenames - file_prefix = email.split('@')[0].lower() - - pub_path = os.path.join(self.path, f'{file_prefix}-pub.asc') - prv_path = os.path.join(self.path, f'{file_prefix}-prv.asc') - - if os.path.isfile(pub_path) and not force: - logger.debug( - 'PGP generation skipped; Public Key already exists: %s', - pub_path) - return True - - # Persistent Storage Key - lookup_key = hashlib.sha1( - os.path.abspath(pub_path).encode('utf-8')).hexdigest() - if lookup_key in self.__key_lookup: - # Ensure our key no longer exists - del self.__key_lookup[lookup_key] - - # Add the user ID to the key - key.add_uid(uid, usage={ - pgpy.constants.KeyFlags.Sign, - pgpy.constants.KeyFlags.EncryptCommunications}, - hashes=[pgpy.constants.HashAlgorithm.SHA256], - ciphers=[pgpy.constants.SymmetricKeyAlgorithm.AES256], - compression=[pgpy.constants.CompressionAlgorithm.ZLIB]) - - try: - # Write our keys to disk - with open(pub_path, 'w') as f: - f.write(str(key.pubkey)) - - except OSError as e: - logger.warning('Error writing PGP file %s', pub_path) - logger.debug(f'I/O Exception: {e}') - - # Cleanup - try: - os.unlink(pub_path) - logger.trace('Removed %s', pub_path) - - except OSError: - pass - - try: - with open(prv_path, 'w') as f: - f.write(str(key)) - - except OSError as e: - logger.warning('Error writing PGP file %s', prv_path) - logger.debug(f'I/O Exception: {e}') - - try: - os.unlink(pub_path) - logger.trace('Removed %s', pub_path) - - except OSError: - pass - - try: - os.unlink(prv_path) - logger.trace('Removed %s', prv_path) - - except OSError: - pass - - return False - - logger.info( - 'Wrote PGP Keys for %s/%s', - os.path.dirname(pub_path), - os.path.basename(pub_path)) - return True - - def public_keyfile(self, *emails): - """ - Returns the first match of a useable public key based emails provided - """ - - if not PGP_SUPPORT: - msg = 'PGP Support unavailable; install PGPy library' - logger.warning(msg) - raise ApprisePGPException(msg) - - if self._pub_keyfile is not None: - # If our code reaches here, then we fetch our public key - pgp_key = self._pub_keyfile[0] - if not pgp_key: - # We could not access the attachment - logger.error( - 'Could not access PGP Public Key {}.'.format( - pgp_key.url(privacy=True))) - return False - - return pgp_key.path - - elif not self.path: - # No path - return None - - fnames = [ - 'pgp-public.asc', - 'pgp-pub.asc', - 'public.asc', - 'pub.asc', - ] - - if self.email: - # Include our email in the list - emails = [self.email] + [*emails] - - for email in emails: - _entry = email.split('@')[0].lower() - fnames.insert(0, f'{_entry}-pub.asc') - - # Lowercase email (Highest Priority) - _entry = email.lower() - fnames.insert(0, f'{_entry}-pub.asc') - - return next( - (os.path.join(self.path, fname) - for fname in fnames - if os.path.isfile(os.path.join(self.path, fname))), - None) - - def public_key(self, *emails, autogen=None): - """ - Opens a spcified pgp public file and returns the key from it which - is used to encrypt the message - """ - path = self.public_keyfile(*emails) - if not path: - if (autogen if autogen is not None else self.asset.pgp_autogen) \ - and self.keygen(*emails): - path = self.public_keyfile(*emails) - if path: - # We should get a hit now - return self.public_key(*emails) - - logger.warning('No PGP Public Key could be loaded') - return None - - # Persistent Storage Key - key = hashlib.sha1( - os.path.abspath(path).encode('utf-8')).hexdigest() - if key in self.__key_lookup: - # Take an early exit - return self.__key_lookup[key]['public_key'] - - try: - with open(path, 'r') as key_file: - public_key, _ = pgpy.PGPKey.from_blob(key_file.read()) - - except NameError: - # PGPy not installed - logger.debug( - 'PGPy not installed; skipping PGP support: %s', path) - return None - - except FileNotFoundError: - # Generate keys - logger.debug('PGP Public Key file not found: %s', path) - return None - - except OSError as e: - logger.warning('Error accessing PGP Public Key file %s', path) - logger.debug(f'I/O Exception: {e}') - return None - - self.__key_lookup[key] = { - 'public_key': public_key, - 'expires': - datetime.now(timezone.utc) + timedelta(seconds=86400) - } - return public_key - - # Encrypt message using the recipient's public key - def encrypt(self, message, *emails): - """ - If provided a path to a pgp-key, content is encrypted - """ - - # Acquire our key - public_key = self.public_key(*emails) - if not public_key: - # Encryption not possible - return False - - try: - message_object = pgpy.PGPMessage.new(message) - encrypted_message = public_key.encrypt(message_object) - return str(encrypted_message) - - except pgpy.errors.PGPError: - # Encryption not Possible - logger.debug( - 'PGP Public Key Corruption; encryption not possible') - - except NameError: - # PGPy not installed - logger.debug('PGPy not installed; Skipping PGP encryption') - - return None - - def prune(self): - """ - Prunes old entries from the public_key index - """ - self.__key_lookup = { - key: value for key, value in self.__key_lookup.items() - if value['expires'] > datetime.now(timezone.utc)} - - @property - def pub_keyfile(self): - """ - Returns the Public Keyfile Path if set otherwise it returns None - This property returns False if a keyfile was provided, but was invalid - """ - return None if self._pub_keyfile is None else ( - False if not self._pub_keyfile[0] else self._pub_keyfile[0].path) diff --git a/apprise/plugins/gmail.py b/apprise/plugins/gmail.py index 2f540537..f87d9d8d 100644 --- a/apprise/plugins/gmail.py +++ b/apprise/plugins/gmail.py @@ -37,9 +37,7 @@ from email.utils import formataddr from ..url import PrivacyMode from ..common import NotifyFormat from ..common import NotifyType -from ..utils import is_email -from ..utils import parse_emails -from ..utils import validate_regex +from ..utils.parse import is_email, parse_emails, validate_regex from ..locale import gettext_lazy as _ from ..common import PersistentStoreMode from . import email as _email