gmail support attempt started; email refactored

This commit is contained in:
Chris Caron 2024-12-14 23:14:32 -05:00
parent 20f27ace2f
commit 76158bd747
3 changed files with 652 additions and 107 deletions

View File

@ -40,7 +40,11 @@ logging.TRACE = logging.DEBUG - 1
# from the command line. The idea here is to allow for deprecation notices
logging.DEPRECATE = logging.ERROR + 1
# Action Required Notices
logging.ACTION_REQUIRED = logging.ERROR + 2
# Assign our Levels into our logging object
logging.addLevelName(logging.ACTION_REQUIRED, "ACTION REQUIRED")
logging.addLevelName(logging.DEPRECATE, "DEPRECATION WARNING")
logging.addLevelName(logging.TRACE, "TRACE")
@ -61,9 +65,18 @@ def deprecate(self, message, *args, **kwargs):
self._log(logging.DEPRECATE, message, args, **kwargs)
def action_required(self, message, *args, **kwargs):
"""
Action Required Logging
"""
if self.isEnabledFor(logging.ACTION_REQUIRED):
self._log(logging.ACTION_REQUIRED, message, args, **kwargs)
# Assign our Loggers for use in Apprise
logging.Logger.trace = trace
logging.Logger.deprecate = deprecate
logging.Logger.action_required = action_required
# Create ourselve a generic (singleton) logging reference
logger = logging.getLogger(LOGGER_NAME)

View File

@ -0,0 +1,354 @@
# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2024, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os
import hashlib
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from ...asset import AppriseAsset
from ...apprise_attachment import AppriseAttachment
from ...logger import logger
from ...exception import ApprisePluginException
try:
import pgpy
# Pretty Good Privacy (PGP) Support enabled
PGP_SUPPORT = True
except ImportError:
# Pretty Good Privacy (PGP) Support disabled
PGP_SUPPORT = False
class ApprisePGPException(ApprisePluginException):
"""
Thrown when there is an error with the Pretty Good Privacy Controller
"""
def __init__(self, message, error_code=602):
super().__init__(message, error_code=error_code)
class ApprisePGPController:
"""
Pretty Good Privacy Controller Tool for the Apprise Library
"""
# There is no reason a PGP Public Key should exceed 8K in size
# If it is more than this, then it is not accepted
max_pgp_public_key_size = 8000
def __init__(self, path, pub_keyfile=None, email=None, asset=None,
**kwargs):
"""
Path should be the directory keys can be written and read from such as
<notifyobject>.store.path
Optionally additionally specify a keyfile to explicitly open
"""
# PGP hash
self.__key_lookup = {}
# Directory we can work with
self.path = path
# Our email
self.email = email
# Prepare our Asset Object
self.asset = \
asset if isinstance(asset, AppriseAsset) else AppriseAsset()
if pub_keyfile:
# Create ourselves an Attachment to work with; this grants us the
# ability to pull this key from a remote site or anything else
# supported by the Attachment object
self._pub_keyfile = AppriseAttachment(asset=self.asset)
# Add our definition to our pgp_key reference
self._pub_keyfile.add(pub_keyfile)
# Enforce maximum file size
self._pub_keyfile[0].max_file_size = self.max_pgp_public_key_size
else:
self._pub_keyfile = None
def keygen(self, email=None, name=None, force=False):
"""
Generates a set of keys based on email configured.
"""
try:
# Create a new RSA key pair with 2048-bit strength
key = pgpy.PGPKey.new(
pgpy.constants.PubKeyAlgorithm.RSAEncryptOrSign, 2048)
except NameError:
# PGPy not installed
logger.debug('PGPy not installed; keygen disabled')
return False
if self._pub_keyfile is not None or not self.path:
logger.trace(
'PGP keygen disabled, reason=%s',
'keyfile-defined' if self._pub_keyfile is not None
else 'no-write-path')
return False
if not name:
name = self.asset.app_id
if not email:
email = self.email
# Prepare our UID
uid = pgpy.PGPUID.new(name, email=email)
# Filenames
file_prefix = email.split('@')[0].lower()
pub_path = os.path.join(self.path, f'{file_prefix}-pub.asc')
prv_path = os.path.join(self.path, f'{file_prefix}-prv.asc')
if os.path.isfile(pub_path) and not force:
logger.debug(
'PGP generation skipped; Public Key already exists: %s',
pub_path)
return True
# Persistent Storage Key
lookup_key = hashlib.sha1(
os.path.abspath(pub_path).encode('utf-8')).hexdigest()
if lookup_key in self.__key_lookup:
# Ensure our key no longer exists
del self.__key_lookup[lookup_key]
# Add the user ID to the key
key.add_uid(uid, usage={
pgpy.constants.KeyFlags.Sign,
pgpy.constants.KeyFlags.EncryptCommunications},
hashes=[pgpy.constants.HashAlgorithm.SHA256],
ciphers=[pgpy.constants.SymmetricKeyAlgorithm.AES256],
compression=[pgpy.constants.CompressionAlgorithm.ZLIB])
try:
# Write our keys to disk
with open(pub_path, 'w') as f:
f.write(str(key.pubkey))
except OSError as e:
logger.warning('Error writing PGP file %s', pub_path)
logger.debug(f'I/O Exception: {e}')
# Cleanup
try:
os.unlink(pub_path)
logger.trace('Removed %s', pub_path)
except OSError:
pass
try:
with open(prv_path, 'w') as f:
f.write(str(key))
except OSError as e:
logger.warning('Error writing PGP file %s', prv_path)
logger.debug(f'I/O Exception: {e}')
try:
os.unlink(pub_path)
logger.trace('Removed %s', pub_path)
except OSError:
pass
try:
os.unlink(prv_path)
logger.trace('Removed %s', prv_path)
except OSError:
pass
return False
logger.info(
'Wrote PGP Keys for %s/%s',
os.path.dirname(pub_path),
os.path.basename(pub_path))
return True
def public_keyfile(self, *emails):
"""
Returns the first match of a useable public key based emails provided
"""
if not PGP_SUPPORT:
msg = 'PGP Support unavailable; install PGPy library'
logger.warning(msg)
raise ApprisePGPException(msg)
if self._pub_keyfile is not None:
# If our code reaches here, then we fetch our public key
pgp_key = self._pub_keyfile[0]
if not pgp_key:
# We could not access the attachment
logger.error(
'Could not access PGP Public Key {}.'.format(
pgp_key.url(privacy=True)))
return False
return pgp_key.path
elif not self.path:
# No path
return None
fnames = [
'pgp-public.asc',
'pgp-pub.asc',
'public.asc',
'pub.asc',
]
if self.email:
# Include our email in the list
emails = [self.email] + [*emails]
for email in emails:
_entry = email.split('@')[0].lower()
fnames.insert(0, f'{_entry}-pub.asc')
# Lowercase email (Highest Priority)
_entry = email.lower()
fnames.insert(0, f'{_entry}-pub.asc')
return next(
(os.path.join(self.path, fname)
for fname in fnames
if os.path.isfile(os.path.join(self.path, fname))),
None)
def public_key(self, *emails, autogen=None):
"""
Opens a spcified pgp public file and returns the key from it which
is used to encrypt the message
"""
path = self.public_keyfile(*emails)
if not path:
if (autogen if autogen is not None else self.asset.pgp_autogen) \
and self.keygen(*emails):
path = self.public_keyfile(*emails)
if path:
# We should get a hit now
return self.public_key(*emails)
logger.warning('No PGP Public Key could be loaded')
return None
# Persistent Storage Key
key = hashlib.sha1(
os.path.abspath(path).encode('utf-8')).hexdigest()
if key in self.__key_lookup:
# Take an early exit
return self.__key_lookup[key]['public_key']
try:
with open(path, 'r') as key_file:
public_key, _ = pgpy.PGPKey.from_blob(key_file.read())
except NameError:
# PGPy not installed
logger.debug(
'PGPy not installed; skipping PGP support: %s', path)
return None
except FileNotFoundError:
# Generate keys
logger.debug('PGP Public Key file not found: %s', path)
return None
except OSError as e:
logger.warning('Error accessing PGP Public Key file %s', path)
logger.debug(f'I/O Exception: {e}')
return None
self.__key_lookup[key] = {
'public_key': public_key,
'expires':
datetime.now(timezone.utc) + timedelta(seconds=86400)
}
return public_key
# Encrypt message using the recipient's public key
def encrypt(self, message, *emails):
"""
If provided a path to a pgp-key, content is encrypted
"""
# Acquire our key
public_key = self.public_key(*emails)
if not public_key:
# Encryption not possible
return False
try:
message_object = pgpy.PGPMessage.new(message)
encrypted_message = public_key.encrypt(message_object)
return str(encrypted_message)
except pgpy.errors.PGPError:
# Encryption not Possible
logger.debug(
'PGP Public Key Corruption; encryption not possible')
except NameError:
# PGPy not installed
logger.debug('PGPy not installed; Skipping PGP encryption')
return None
def prune(self):
"""
Prunes old entries from the public_key index
"""
self.__key_lookup = {
key: value for key, value in self.__key_lookup.items()
if value['expires'] > datetime.now(timezone.utc)}
@property
def pub_keyfile(self):
"""
Returns the Public Keyfile Path if set otherwise it returns None
This property returns False if a keyfile was provided, but was invalid
"""
return None if self._pub_keyfile is None else (
False if not self._pub_keyfile[0] else self._pub_keyfile[0].path)

View File

@ -33,6 +33,7 @@ from datetime import datetime
from datetime import timedelta
from .base import NotifyBase
from .. import exception
from email.utils import formataddr
from ..url import PrivacyMode
from ..common import NotifyFormat
from ..common import NotifyType
@ -41,7 +42,7 @@ from ..utils import parse_emails
from ..utils import validate_regex
from ..locale import gettext_lazy as _
from ..common import PersistentStoreMode
from .email import NotifyEmail
from . import email as _email
class NotifyGMail(NotifyBase):
@ -58,6 +59,9 @@ class NotifyGMail(NotifyBase):
# The default protocol
secure_protocol = 'gmail'
# GMail SMTP Host (used for generating a Message-ID)
google_smtp_host = 'smtp.gmail.com'
# Allow 300 requests per minute.
# 60/300 = 0.2
request_rate_per_sec = 0.20
@ -66,12 +70,12 @@ class NotifyGMail(NotifyBase):
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_gmail'
# Google OAuth2 URLs
auth_url = "https://oauth2.googleapis.com/device/code"
device_url = "https://oauth2.googleapis.com/device/code"
token_url = "https://oauth2.googleapis.com/token"
send_url = "https://gmail.googleapis.com/gmail/v1/users/me/messages/send"
# The maximum number of seconds we will wait for our token to be acquired
token_acquisition_timeout = 6.0
token_acquisition_timeout = 14.0
# Required Scope
scope = "https://www.googleapis.com/auth/gmail.send"
@ -88,7 +92,7 @@ class NotifyGMail(NotifyBase):
# Define object templates
templates = (
# Send as user (only supported method)
'{schema}://{user}@{client_id}/{secret}',
'{schema}://{user}@{client_id}/{secret}/{targets}',
)
@ -104,7 +108,9 @@ class NotifyGMail(NotifyBase):
'type': 'string',
'required': True,
'private': True,
'regex': (r'^[a-z0-9-]+$', 'i'),
# Generally looks like:
# 12345012-xxxxxxxxxxxxxxxxxxxxxxxxxxxx.apps.googleusercontent.com
'regex': (r'^[a-z0-9-.]+$', 'i'),
},
'secret': {
'name': _('Client Secret'),
@ -136,10 +142,10 @@ class NotifyGMail(NotifyBase):
'name': _('Blind Carbon Copy'),
'type': 'list:string',
},
'oauth_id': {
'client_id': {
'alias_of': 'client_id',
},
'oauth_secret': {
'secret': {
'alias_of': 'secret',
},
'from': {
@ -161,6 +167,11 @@ class NotifyGMail(NotifyBase):
'default': '',
'map_to': 'pgp_key',
},
'reply': {
'name': _('Reply To'),
'type': 'list:string',
'map_to': 'reply_to',
},
})
# Define any kwargs we're using
@ -171,9 +182,9 @@ class NotifyGMail(NotifyBase):
},
}
def __init__(self, client_id, secret, targets=None, cc=None, bcc=None,
from_addr=None, headers=None, use_pgp=None, pgp_key=None,
*kwargs):
def __init__(self, client_id, secret, targets=None, from_addr=None,
cc=None, bcc=None, reply_to=None, headers=None,
use_pgp=None, pgp_key=None, **kwargs):
"""
Initialize GMail Object
"""
@ -206,9 +217,8 @@ class NotifyGMail(NotifyBase):
# For tracking our email -> name lookups
self.names = {}
self.headers = {
'X-Application': self.app_id,
}
# Save our headers
self.headers = {}
if headers:
# Store our extra headers
self.headers.update(headers)
@ -219,23 +229,12 @@ class NotifyGMail(NotifyBase):
# Acquire Blind Carbon Copies
self.bcc = set()
# Acquire Reply To
self.reply_to = set()
# Parse our targets
self.targets = list()
for recipient in parse_emails(targets):
# Validate recipients (to:) and drop bad ones:
result = is_email(recipient)
if result:
# Add our email to our target list
self.targets.append(
(result['name'] if result['name'] else False,
result['full_email']))
continue
self.logger.warning(
'Dropped invalid To email ({}) specified.'
.format(recipient))
# Validate recipients (cc:) and drop bad ones:
for recipient in parse_emails(cc):
email = is_email(recipient)
@ -268,8 +267,25 @@ class NotifyGMail(NotifyBase):
'({}) specified.'.format(recipient),
)
# Validate recipients (reply-to:) and drop bad ones:
for recipient in parse_emails(reply_to):
email = is_email(recipient)
if email:
self.reply_to.add(email['full_email'])
# Index our name (if one exists)
self.names[email['full_email']] = \
email['name'] if email['name'] else False
continue
self.logger.warning(
'Dropped invalid Reply To email '
'({}) specified.'.format(recipient),
)
# Our token is acquired upon a successful login
self.token = None
self.refresh = None
# Presume that our token has expired 'now'
self.token_expiry = datetime.now()
@ -278,12 +294,6 @@ class NotifyGMail(NotifyBase):
# addresses from the URL provided
self.from_addr = [False, '']
# pgp hash
self.pgp_public_keys = {}
self.use_pgp = use_pgp if not None \
else self.template_args['pgp']['default']
if from_addr:
result = is_email(from_addr)
if result:
@ -294,7 +304,7 @@ class NotifyGMail(NotifyBase):
# Only update the string but use the already detected info
self.from_addr[0] = from_addr
else: # Default
else: # Send email to ourselves by default
self.from_addr[1] = f'{self.user}@gmail.com'
result = is_email(self.from_addr[1])
@ -308,6 +318,40 @@ class NotifyGMail(NotifyBase):
# Store our lookup
self.names[self.from_addr[1]] = self.from_addr[0]
if targets:
for recipient in parse_emails(targets):
# Validate recipients (to:) and drop bad ones:
result = is_email(recipient)
if result:
# Add our email to our target list
self.targets.append(
(result['name'] if result['name'] else False,
result['full_email']))
continue
self.logger.warning(
'Dropped invalid To email ({}) specified.'
.format(recipient))
else:
self.targets.append((False, self.from_addr[1]))
# Prepare our Pretty Good Privacy Object
self.pgp = _email.pgp.ApprisePGPController(
path=self.store.path, pub_keyfile=pgp_key,
email=self.from_addr[1], asset=self.asset)
# We store so we can generate a URL later on
self.pgp_key = pgp_key
self.use_pgp = use_pgp if not None \
else self.template_args['pgp']['default']
if self.use_pgp and not email.pgp.PGP_SUPPORT:
self.logger.warning(
'PGP Support is not available on this installation; '
'ask admin to install PGPy')
return
def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
@ -325,19 +369,29 @@ class NotifyGMail(NotifyBase):
'There are no Email recipients to notify')
return False
if not self.authenticate():
self.logger.warning('Could not authenticate with the GMail')
return False
# Prepare our headers
headers = {
'X-Application': self.app_id,
}
headers.update(self.headers)
try:
for message in NotifyEmail.prepare_emails(
for message in _email.NotifyEmail.prepare_emails(
subject=title, body=body, notify_format=self.notify_format,
from_addr=self.from_addr, to=self.targets,
cc=self.cc, bcc=self.bcc, reply_to=self.reply_to,
smtp_host=self.smtp_host,
attach=attach, headers=self.headers, names=self.names,
pgp=self.use_pgp, pgp_path='TODO'):
smtp_host=self.google_smtp_host,
attach=attach, headers=headers, names=self.names,
pgp=self.pgp):
# Encode the message in base64
payload = {
"raw": base64.urlsafe_b64encode(
message.as_bytes()).decode()
message.body.encode()).decode()
}
# Perform upstream post
@ -354,7 +408,28 @@ class NotifyGMail(NotifyBase):
return not has_error
def authenticate(self):
# def authenticate(self):
# """
# JWT Authentication
# """
# iat = time.time()
# exp = iat + 3600 # Token valid for 1 hour
# payload = {
# # Issuer (service account email)
# "iss": self.from_addr[1],
# # Scopes for Gmail API
# "scope": self.scope,
# # Audience (token endpoint)
# "aud": self.token_url,
# # Expiration time
# "exp": exp,
# # Issued at time
# "iat": iat
# }
def authenticate(self, timeout=None, long_poll=5.0, short_poll=2.0):
"""
Logs into and acquires us an authentication token to work with
"""
@ -365,8 +440,79 @@ class NotifyGMail(NotifyBase):
'Already authenticate with token {}'.format(self.token))
return True
if not timeout:
# Save our default timeout
timeout = self.token_acquisition_timeout
def token_store(response, save=True):
"""
Stores token data
"""
try:
# Extract our time from our response and subtrace 10
# seconds from it to give us some wiggle/grace people to
# re-authenticate if we need to
self.token_expiry = datetime.now() + \
timedelta(seconds=int(response.get('expires_in')) - 10)
except (ValueError, AttributeError, TypeError):
# ValueError: expires_in wasn't an integer
# TypeError: expires_in was None
# AttributeError: we could not extract anything from our
# response object.
return False
if save:
# store our content to disk
self.store.write(
json.dumps(response).encode('utf-8'), key='tokens')
# Store our other tokens for fast access
self.token = response.get("access_token")
self.refresh = response.get("refresh_token")
return True
# Read our content to see if it exists
try:
response = json.loads(
self.store.read(key='tokens').decode('utf-8'))
except AttributeError:
# NoneType returned; nothing to decode.
response = None
if response and token_store(response, save=False) and self.refresh:
if self.token_expiry > (datetime.now() - timedelta(days=20)):
#
# We have to refresh our token
#
payload = {
"client_id": self.client_id,
"client_secret": self.secret,
"refresh_token": self.refresh_token,
"grant_type": "refresh_token",
}
postokay, response = self._fetch(
url=self.token_url, payload=payload)
if postokay and token_store(response):
# We were successful
return True
elif self.token:
# we're good with the information we have
return True
#
# If we reach here, we've either expired, or we need to authenticate
# for the first time.
#
# Reset our token
self.token = None
self.refresh = None
# Reset our token cache file
self.store.delete('tokens')
# Prepare our payload
payload = {
@ -375,31 +521,14 @@ class NotifyGMail(NotifyBase):
}
postokay, response = self._fetch(
url=self.auth_url, payload=payload,
content_type='application/x-www-form-urlencoded')
url=self.device_url, payload=payload,
content_type=None)
if not postokay:
return False
# Reset our token
self.token = None
# A device token is required to get our token
device_code = None
try:
# Extract our time from our response and subtrace 10 seconds from
# it to give us some wiggle/grace people to re-authenticate if we
# need to
self.token_expiry = datetime.now() + \
timedelta(seconds=int(response.get('expires_in')) - 10)
except (ValueError, AttributeError, TypeError):
# ValueError: expires_in wasn't an integer
# TypeError: expires_in was None
# AttributeError: we could not extract anything from our response
# object.
return False
# Go ahead and store our token if it's available
device_code = response.get('device_code')
@ -419,19 +548,32 @@ class NotifyGMail(NotifyBase):
postokay, response = self._fetch(
url=self.token_url, payload=payload)
if postokay:
self.token = response.get("access_token")
if postokay and token_store(response):
# We were successful
break
if response and response.get("error") == "authorization_pending":
if response and response.get("error") in (
"authorization_pending", "slow_down"):
# Our own throttle so we can abort eventually....
elapsed = (datetime.now() - reference).total_seconds()
if elapsed >= self.token_acquisition_timeout:
self.logger.warning(
'The GMail token could not be acquired')
remaining = \
0.0 if (timeout - elapsed) < 0.0 else (timeout - elapsed)
self.logger.action_required(
f"Visit \"{response['verification_url']}\" "
f"and enter code: {response['user_code']} "
f"- [remaining={remaining:.2f}sec]")
if elapsed >= timeout:
self.logger.warning('GMail token could not be acquired')
break
time.sleep(0.5)
# Throttle
time.sleep(
short_poll if response.get("error") != "slow_down"
else long_poll)
# Loop and see if we were successful
continue
# We failed
@ -451,9 +593,13 @@ class NotifyGMail(NotifyBase):
if not headers:
headers = {
'User-Agent': self.app_id,
'Content-Type': content_type,
}
if content_type:
headers.update({
'Content-Type': content_type,
})
if self.token:
# Are we authenticated?
headers['Authorization'] = 'Bearer ' + self.token
@ -480,6 +626,15 @@ class NotifyGMail(NotifyBase):
timeout=self.request_timeout,
)
try:
content = json.loads(r.content)
except (AttributeError, TypeError, ValueError):
# ValueError = r.content is Unparsable
# TypeError = r.content is None
# AttributeError = r is None
content = {}
if r.status_code not in (
requests.codes.ok, requests.codes.created,
requests.codes.accepted):
@ -501,15 +656,6 @@ class NotifyGMail(NotifyBase):
# Mark our failure
return (False, content)
try:
content = json.loads(r.content)
except (AttributeError, TypeError, ValueError):
# ValueError = r.content is Unparsable
# TypeError = r.content is None
# AttributeError = r is None
content = {}
except requests.RequestException as e:
self.logger.warning(
'Exception received when sending GMail to {}: '.
@ -528,29 +674,58 @@ class NotifyGMail(NotifyBase):
another simliar one. Targets or end points should never be identified
here.
"""
return (self.secure_protocol, self.user, self.client_id, self.secret)
return (self.secure_protocol, self.user, self.client_id,
self.secret)
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define an URL parameters
params = {
'pgp': 'yes' if self.use_pgp else 'no',
}
# Store our public key back into your URL
if self.pgp_key is not None:
params['pgp_key'] = NotifyGMail.quote(self.pgp_key, safe=':\\/')
# Append our headers into our parameters
params.update({'+{}'.format(k): v for k, v in self.headers.items()})
# Extend our parameters
params = self.url_parameters(privacy=privacy, *args, **kwargs)
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
if self.cc:
# Handle our Carbon Copy Addresses
params['cc'] = ','.join(
['{}{}'.format(
'' if not self.names.get(e)
else '{}:'.format(self.names[e]), e) for e in self.cc])
params['cc'] = ','.join([
formataddr(
(self.names[e] if e in self.names else False, e),
# Swap comma for it's escaped url code (if detected) since
# we're using that as a delimiter
charset='utf-8').replace(',', '%2C')
for e in self.cc])
if self.bcc:
# Handle our Blind Carbon Copy Addresses
params['bcc'] = ','.join(
['{}{}'.format(
'' if not self.names.get(e)
else '{}:'.format(self.names[e]), e) for e in self.bcc])
params['bcc'] = ','.join([
formataddr(
(self.names[e] if e in self.names else False, e),
# Swap comma for it's escaped url code (if detected) since
# we're using that as a delimiter
charset='utf-8').replace(',', '%2C')
for e in self.bcc])
if self.reply_to:
# Handle our Reply-To Addresses
params['reply'] = ','.join([
formataddr(
(self.names[e] if e in self.names else False, e),
# Swap comma for it's escaped url code (if detected) since
# we're using that as a delimiter
charset='utf-8').replace(',', '%2C')
for e in self.reply_to])
return '{schema}://{user}@{client_id}/{secret}' \
'/{targets}/?{params}'.format(
@ -591,6 +766,7 @@ class NotifyGMail(NotifyBase):
# of targets, the presume the remainder of the entries are part
# of the secret key (since it can contain slashes in it)
entries = NotifyGMail.split_path(results['fullpath'])
entries.insert(0, NotifyGMail.unquote(results['host']))
# Initialize our email
results['email'] = None
@ -603,22 +779,32 @@ class NotifyGMail(NotifyBase):
NotifyGMail.unquote(results['qsd']['from'])
# OAuth2 ID
if 'oauth_id' in results['qsd'] and len(results['qsd']['oauth_id']):
if 'client_id' in results['qsd'] and len(results['qsd']['client_id']):
# Extract the API Key from an argument
results['client_id'] = \
NotifyGMail.unquote(results['qsd']['oauth_id'])
NotifyGMail.unquote(results['qsd']['client_id'])
elif entries:
# Get our client_id is the first entry on the path
results['client_id'] = NotifyGMail.unquote(entries.pop(0))
# OAuth2 Secret
if 'secret' in results['qsd'] and len(results['qsd']['secret']):
# Extract the API Key from an argument
results['secret'] = \
NotifyGMail.unquote(results['qsd']['secret'])
elif entries:
# Get our secret is the next entry on the path
results['secret'] = NotifyGMail.unquote(entries.pop(0))
#
# Prepare our target listing
#
results['targets'] = list()
while entries:
# Pop the last entry
entry = NotifyGMail.unquote(entries.pop(-1))
# Pop our remaining entries
entry = NotifyGMail.unquote(entries.pop())
if is_email(entry):
# Store our email and move on
@ -629,23 +815,6 @@ class NotifyGMail(NotifyBase):
# key, so put it back
entries.append(NotifyGMail.quote(entry, safe=''))
# We're done
break
# OAuth2 Secret
if 'oauth_secret' in results['qsd'] and \
len(results['qsd']['oauth_secret']):
# Extract the API Secret from an argument
results['secret'] = \
NotifyGMail.unquote(results['qsd']['oauth_secret'])
else:
# Assemble our secret key which is a combination of the host
# followed by all entries in the full path that follow up until
# the first email
results['secret'] = '/'.join(
[NotifyGMail.unquote(x) for x in entries])
# Support the 'to' variable so that we can support targets this way too
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
@ -660,4 +829,13 @@ class NotifyGMail(NotifyBase):
if 'bcc' in results['qsd'] and len(results['qsd']['bcc']):
results['bcc'] = results['qsd']['bcc']
# Handle Reply To Addresses
if 'reply' in results['qsd'] and len(results['qsd']['reply']):
results['reply_to'] = results['qsd']['reply']
# Add our Meta Headers that the user can provide with their outbound
# emails
results['headers'] = {NotifyBase.unquote(x): NotifyBase.unquote(y)
for x, y in results['qsd+'].items()}
return results