some refactoring

This commit is contained in:
Chris Caron 2024-09-17 21:32:10 -04:00
parent e8b2d4221d
commit 1a9d1dddd5
5 changed files with 163 additions and 35 deletions

View File

@ -44,6 +44,7 @@ from datetime import datetime
from datetime import timedelta from datetime import timedelta
from datetime import timezone from datetime import timezone
from ..apprise_attachment import AppriseAttachment
from .base import NotifyBase from .base import NotifyBase
from ..url import PrivacyMode from ..url import PrivacyMode
from ..common import NotifyFormat, NotifyType, PersistentStoreMode from ..common import NotifyFormat, NotifyType, PersistentStoreMode
@ -367,6 +368,10 @@ class NotifyEmail(NotifyBase):
# Support attachments # Support attachments
attachment_support = True attachment_support = True
# There is no reason a PGP Public Key should exceed 8K in size
# If it is more than this, then it is not accepted
max_pgp_public_key_size = 8000
# Default Notify Format # Default Notify Format
notify_format = NotifyFormat.HTML notify_format = NotifyFormat.HTML
@ -469,6 +474,14 @@ class NotifyEmail(NotifyBase):
'type': 'list:string', 'type': 'list:string',
'map_to': 'reply_to', 'map_to': 'reply_to',
}, },
'pgpkey': {
'name': _('PGP Public Key Path'),
'type': 'string',
'private': True,
# By default persistent storage is referenced
'default': '',
'map_to': 'pgp_key',
},
}) })
# Define any kwargs we're using # Define any kwargs we're using
@ -481,7 +494,7 @@ class NotifyEmail(NotifyBase):
def __init__(self, smtp_host=None, from_addr=None, secure_mode=None, def __init__(self, smtp_host=None, from_addr=None, secure_mode=None,
targets=None, cc=None, bcc=None, reply_to=None, headers=None, targets=None, cc=None, bcc=None, reply_to=None, headers=None,
use_pgp=None, **kwargs): use_pgp=None, pgp_key=None, **kwargs):
""" """
Initialize Email Object Initialize Email Object
@ -529,6 +542,18 @@ class NotifyEmail(NotifyBase):
'PGP Support is not available on this installation; ' 'PGP Support is not available on this installation; '
'ask admin to install PGPy') 'ask admin to install PGPy')
# Our template object is just an AppriseAttachment object
if pgp_key:
self.pgp_key = AppriseAttachment(asset=self.asset)
# Add our definition to our pgp_key reference
self.pgp_key.add(pgp_key)
# Enforce maximum file size
self.pgp_key[0].max_file_size = self.max_pgp_public_key_size
else:
# No key; use auto-generation
self.pgp_key = None
# Now detect secure mode # Now detect secure mode
if secure_mode: if secure_mode:
self.secure_mode = None \ self.secure_mode = None \
@ -861,6 +886,7 @@ class NotifyEmail(NotifyBase):
base = mixed base = mixed
if self.use_pgp: if self.use_pgp:
self.logger.debug("Securing email with PGP Encryption")
# Apply our encryption # Apply our encryption
encrypted_content = self.pgp_encrypt_message(base.as_string()) encrypted_content = self.pgp_encrypt_message(base.as_string())
if encrypted_content: if encrypted_content:
@ -1054,10 +1080,26 @@ class NotifyEmail(NotifyBase):
return True return True
@property @property
def pgp_fnames(self): def pgp_pubkey(self):
""" """
Returns a list of filenames worth scanning for Returns a list of filenames worth scanning for
""" """
if self.pgp_key is not None:
# If our code reaches here, then we fetch our public key
pgp_key = self.pgp_key[0]
if not pgp_key:
# We could not access the attachment
self.logger.error(
'Could not access PGP Public Key {}.'.format(
pgp_key.url(privacy=True)))
return False
return pgp_key.path
elif not self.store.path:
# No path
return None
fnames = [ fnames = [
'pgp-public.asc', 'pgp-public.asc',
'pgp-pub.asc', 'pgp-pub.asc',
@ -1077,7 +1119,11 @@ class NotifyEmail(NotifyBase):
if _entry not in fnames: if _entry not in fnames:
fnames.insert(0, f'{_entry}-pub.asc') fnames.insert(0, f'{_entry}-pub.asc')
return fnames return next(
(os.path.join(self.store.path, fname)
for fname in fnames
if os.path.isfile(os.path.join(self.store.path, fname))),
None)
def pgp_public_key(self, path=None): def pgp_public_key(self, path=None):
""" """
@ -1085,20 +1131,11 @@ class NotifyEmail(NotifyBase):
is used to encrypt the message is used to encrypt the message
""" """
if path is None: if path is None:
path = next( path = self.pgp_pubkey
(os.path.join(self.store.path, fname)
for fname in self.pgp_fnames
if os.path.isfile(os.path.join(self.store.path, fname))),
None)
if not path: if not path:
if self.pgp_generate_keys(path=self.store.path): if self.pgp_generate_keys(path=self.store.path):
path = next( path = self.pgp_pubkey
(os.path.join(self.store.path, fname)
for fname in self.pgp_fnames
if os.path.isfile(
os.path.join(self.store.path, fname))), None)
if path: if path:
# We should get a hit now # We should get a hit now
return self.pgp_public_key(path=path) return self.pgp_public_key(path=path)
@ -1138,7 +1175,6 @@ class NotifyEmail(NotifyBase):
self.logger.debug(f'I/O Exception: {e}') self.logger.debug(f'I/O Exception: {e}')
return None return None
self.store.set(ps_key, public_key, expires=86400)
self.pgp_public_keys[ps_key] = { self.pgp_public_keys[ps_key] = {
'public_key': public_key, 'public_key': public_key,
'expires': 'expires':
@ -1179,6 +1215,11 @@ class NotifyEmail(NotifyBase):
'pgp': 'yes' if self.use_pgp else 'no', 'pgp': 'yes' if self.use_pgp else 'no',
} }
# Store oure public key back into your URL
if self.pgp_key is not None:
params['pgp_key'] = NotifyEmail.quote(
self.pgp_key[0].url(privacy=privacy), safe=':')
# Append our headers into our parameters # Append our headers into our parameters
params.update({'+{}'.format(k): v for k, v in self.headers.items()}) params.update({'+{}'.format(k): v for k, v in self.headers.items()})
@ -1335,6 +1376,11 @@ class NotifyEmail(NotifyBase):
parse_bool(results['qsd'].get( parse_bool(results['qsd'].get(
'pgp', NotifyEmail.template_args['pgp']['default'])) 'pgp', NotifyEmail.template_args['pgp']['default']))
# Get PGP Public Key Override
if 'pgpkey' in results['qsd'] and results['qsd']['pgpkey']:
results['pgp_key'] = \
NotifyEmail.unquote(results['qsd']['pgpkey'])
# The From address is a must; either through the use of templates # The From address is a must; either through the use of templates
# from= entry and/or merging the user and hostname together, this # from= entry and/or merging the user and hostname together, this
# must be calculated or parse_url will fail. # must be calculated or parse_url will fail.

View File

@ -2068,27 +2068,42 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir):
# Initialize our email (no from name) # Initialize our email (no from name)
obj = Apprise.instantiate('mailto://user:pass@nuxref.com?pgp=yes') obj = Apprise.instantiate('mailto://user:pass@nuxref.com?pgp=yes')
# Test our names # Nothing to lookup
fnames = obj.pgp_fnames assert obj.pgp_pubkey is None
assert isinstance(fnames, list) assert obj.pgp_public_key() is None
assert obj.pgp_encrypt_message("message") is False
# login is pgp # Keys can not be generated in memory mode
obj = Apprise.instantiate('mailto://pgp:pass@nuxref.com?pgp=yes')
# Test our names
fnames = obj.pgp_fnames
assert isinstance(fnames, list)
# login is pgp
obj = Apprise.instantiate('mailto://chris:pass@nuxref.com?pgp=yes')
fnames = obj.pgp_fnames
assert isinstance(fnames, list)
# Attempt to generate keys
obj = Apprise.instantiate('mailto://chris:pass@nuxref.com?pgp=yes')
# We're in memory mode
assert obj.store.mode == PersistentStoreMode.MEMORY
assert obj.pgp_generate_keys() is False assert obj.pgp_generate_keys() is False
# The reason... no location to store data
assert obj.store.mode == PersistentStoreMode.MEMORY
tmpdir0 = tmpdir.mkdir('tmp00')
asset = AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH,
storage_path=str(tmpdir0),
)
# Prepare PGP
obj = Apprise.instantiate(
'mailto://pgp:pass@nuxref.com?pgp=yes', asset=asset)
assert obj.store.mode == PersistentStoreMode.FLUSH
# Still no public key
assert obj.pgp_pubkey is None
assert obj.pgp_generate_keys() is True
# Now we'll have a public key
assert isinstance(obj.pgp_pubkey, str)
# Prepare PGP
obj = Apprise.instantiate(
f'mailto://pgp:pass@nuxref.com?pgp=yes&pgpkey={obj.pgp_pubkey}',
asset=asset)
# We will find our key
assert obj.pgp_public_key() is not None
tmpdir1 = tmpdir.mkdir('tmp01') tmpdir1 = tmpdir.mkdir('tmp01')
# However explicitly setting a path works # However explicitly setting a path works
assert obj.pgp_generate_keys(str(tmpdir1)) is True assert obj.pgp_generate_keys(str(tmpdir1)) is True

View File

@ -0,0 +1,18 @@
-----BEGIN PGP PUBLIC KEY BLOCK-----
xsBNBGbo3ycBCACjECe63GpZanFYLE678OIhzpvY0J6pCBCZblGxXOuwuv7VPIq7
XQN91UdOL8huDb/JYRxarDoS6+JvyempbzZt0S+veXDl4pRFb+Q4a5sp1l/Mduw0
rDFErwfF8SpMPBI2WJUhN9n72UEqXVDvTPdcAka8v8p7dS6/RKZzch8P+EjgJME0
p9+N/2Lrbi2nDDXD+xD4Odw83J5V8Xn/jie3GCxtXda5XIX3EzTSB30elLLNBMcq
ooooooooooooooooooooooooooooo6bRS9TT34rZZk7qyB2iroq9SdIBCGyn1q4r
uIskVNCsqgP9cMa+S1XePUT77VNNN1yzhACpABEBAAHNIUNocmlzIENhcm9uIDxs
ZWFkMmdvbGRAZ21haWwuooooooooooooooooooooooooooooogILCQIVCAIWAgIe
ARYhBEHHWtq4Kh8dGraFnkmZAD9B29oPAAoJEEmZAD9B29oPAawIAImCijTdvDl8
Sibwo7gL4ooooooooooooooooooooooooooooofjiEEW8gVQ4W2KDs74aCGkQtQJ
irvNA7WnuyMyXZyvhYa63U7GTk5RdVkMygT0a5n8/8HVAenZrBL6VNaZYw/LlgWd
0knhsmqdGTsjKuYdZ3Cooooooooooooooooooooooooooooo2GWBnvOQje+lQGIf
rE6TIwsf4QoKXSkTakzggbpZZl2hg2O6dJiij1cH+DYFVTaVXw4rVmo8ckTJ9DiF
T9H/EmsNqlSKTTv1Aw4raCFZ+T/Ocsw/vIOoEtVhiT/mfDcIbi0VB3EhYvI3eFso
yiZsjyu9xY0=
=ZY2q
-----END PGP PUBLIC KEY BLOCK-----

View File

@ -0,0 +1,31 @@
-----BEGIN PGP PRIVATE KEY BLOCK-----
xcLYBGbo3ycBCACjECe63GpZanFYLE678OIhzpvY0J6pCBCZblGxXOuwuv7VPIq7
XQN91UdOL8huDb/JYRxarDoS6+JvyempbzZt0S+veXDl4pRFb+Q4a5sp1l/Mduw0
rDFErwfF8SpMPBI2WJUhN9n72UEqXVDvTPdcAka8v8p7dS6/RKZzch8P+EjgJME0
p9+N/2Lrbi2nDDXD+xD4Odw83J5V8Xn/jie3GCxtXda5XIX3EzTSB30elLLNBMcq
N5xJBTrjhciDzU85Gb+bUecnoj9Oj6bRS9TT34rZZk7qyB2iroq9SdIBCGyn1q4r
uIskVNCsqgP9cMa+S1XePUT77VNNN1yzhACpABEBAAEAB/4tOpPqjqyo9I9Px6pn
Et+GRQqRTvxTIjuIc0MRkRaGxLdeahaI9bm8M2Y9158ed43Uy6zTsaXCDc+W9khr
iL9uInG5mFOqT/iUcf65b49wQVf9HJdT3Ncll+7uBoCW+KqMjHGA7z71TkN2/r8u
QQjzamY4gHInYE+BGgeZSfQ3t1MJMSdjQopPGDwSsco5hQJYtVH8K/0/Ig5S5E9Q
KD9ku3W9bCliERkljIwEbbyDv/vmbxPKdWW83T+UQK6CQhkH6h69EoMGQ76a6y/H
UuppNSpxuR4BiX4ZlcUyARrLluaRS0K1/OZCoScA0LLjY8pCEBoWT0uhhvy3t2xD
/bipBADS3bM3yGGZKtNgLPQx0BAyWk07OD3AlObykz4yTIc9DZj1bzhHKgAhwUAN
k7StwA22HoxMCKSoxhherZaXAQaJJOJKNXw3DphHCexrBq77nxBu3yo9UStj04Lx
tCEibclsQcwgh7TjjjDQdRYiirZvu9IGQBf27xKvTepibn7NnwQAxfcePfXsHza3
7CuJbxOGFaPf4ENSpFRYSZbH3dErtSlGDzz8e8jI0Ck9LQgp9MfjksWUwaMQbXdV
zNbQe1lAWQxtN9amVvEWvrAJhbhEU6RLsSjpZ9W5r3xAbfkoDg/icjbdoOqwI6LE
aTEhwaz+XZMLYJiT22AMJyC7TcL6fLcD/0nBRheQBqTsuYKimKI5yZ3ZdlGHfaLN
OqMGfuaEQCUAhSaXNliuP3XAWfiVXCaRw9De+Eod6DfGMGTTx8EVy7N4y4w3TORp
fFKaMGD3oiw1Eh63K1jV2yWPPpOnyc+YtXCPuGS+n/3CITc5cKxaPapQtCJA9Gw0
OaZ7ikUNs0Q9NbfNIUNocmlzIENhcm9uIDxsZWFkMmdvbGRAZ21haWwuY29tPsLA
ggQTAQgALAUCZujfJwIbBgILCQIVCAIWAgIeARYhBEHHWtq4Kh8dGraFnkmZAD9B
29oPAAoJEEmZAD9B29oPAawIAImCijTdvDl8Sibwo7gL4ayF4S3KhaKCYORcMM1o
e4pesy5ME6fjiEEW8gVQ4W2KDs74aCGkQtQJirvNA7WnuyMyXZyvhYa63U7GTk5R
dVkMygT0a5n8/8HVAenZrBL6VNaZYw/LlgWd0knhsmqdGTsjKuYdZ3CHED85pv/M
Owe0pyGOQKtJ1t9qwc6l2GWBnvOQje+lQGIfrE6TIwsf4QoKXSkTakzggbpZZl2h
g2O6dJiij1cH+DYFVTaVXw4rVmo8ckTJ9DiFT9H/EmsNqlSKTTv1Aw4raCFZ+T/O
csw/vIOoEtVhiT/mfDcIbi0VB3EhYvI3eFsoyiZsjyu9xY0=
=dBp6
-----END PGP PRIVATE KEY BLOCK-----

View File

@ -0,0 +1,18 @@
-----BEGIN PGP PUBLIC KEY BLOCK-----
xsBNBGbo3ycBCACjECe63GpZanFYLE678OIhzpvY0J6pCBCZblGxXOuwuv7VPIq7
XQN91UdOL8huDb/JYRxarDoS6+JvyempbzZt0S+veXDl4pRFb+Q4a5sp1l/Mduw0
rDFErwfF8SpMPBI2WJUhN9n72UEqXVDvTPdcAka8v8p7dS6/RKZzch8P+EjgJME0
p9+N/2Lrbi2nDDXD+xD4Odw83J5V8Xn/jie3GCxtXda5XIX3EzTSB30elLLNBMcq
N5xJBTrjhciDzU85Gb+bUecnoj9Oj6bRS9TT34rZZk7qyB2iroq9SdIBCGyn1q4r
uIskVNCsqgP9cMa+S1XePUT77VNNN1yzhACpABEBAAHNIUNocmlzIENhcm9uIDxs
ZWFkMmdvbGRAZ21haWwuY29tPsLAggQTAQgALAUCZujfJwIbBgILCQIVCAIWAgIe
ARYhBEHHWtq4Kh8dGraFnkmZAD9B29oPAAoJEEmZAD9B29oPAawIAImCijTdvDl8
Sibwo7gL4ayF4S3KhaKCYORcMM1oe4pesy5ME6fjiEEW8gVQ4W2KDs74aCGkQtQJ
irvNA7WnuyMyXZyvhYa63U7GTk5RdVkMygT0a5n8/8HVAenZrBL6VNaZYw/LlgWd
0knhsmqdGTsjKuYdZ3CHED85pv/MOwe0pyGOQKtJ1t9qwc6l2GWBnvOQje+lQGIf
rE6TIwsf4QoKXSkTakzggbpZZl2hg2O6dJiij1cH+DYFVTaVXw4rVmo8ckTJ9DiF
T9H/EmsNqlSKTTv1Aw4raCFZ+T/Ocsw/vIOoEtVhiT/mfDcIbi0VB3EhYvI3eFso
yiZsjyu9xY0=
=ZY2q
-----END PGP PUBLIC KEY BLOCK-----