Support for PGP Email Support

This commit is contained in:
Chris Caron 2024-09-16 21:50:20 -04:00
parent f35145e899
commit e8b2d4221d
3 changed files with 340 additions and 14 deletions

View File

@ -11,3 +11,6 @@ gntp
# Provides mqtt:// support
# use v1.x due to https://github.com/eclipse/paho.mqtt.python/issues/814
paho-mqtt < 2.0.0
# Pretty Good Privacy (PGP) Provides mailto:// and deltachat:// support
PGPy

View File

@ -27,6 +27,7 @@
# POSSIBILITY OF SUCH DAMAGE.
import dataclasses
import os
import re
import smtplib
import typing as t
@ -36,19 +37,30 @@ from email.mime.multipart import MIMEMultipart
from email.utils import formataddr, make_msgid
from email.header import Header
from email import charset
import hashlib
from socket import error as SocketError
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from .base import NotifyBase
from ..url import PrivacyMode
from ..common import NotifyFormat, NotifyType
from ..common import NotifyFormat, NotifyType, PersistentStoreMode
from ..conversion import convert_between
from ..utils import is_ipaddr, is_email, parse_emails, is_hostname
from ..utils import is_ipaddr, is_email, parse_emails, is_hostname, parse_bool
from ..locale import gettext_lazy as _
from ..logger import logger
try:
import pgpy
# Pretty Good Privacy (PGP) Support enabled
PGP_SUPPORT = True
except ImportError:
# Pretty Good Privacy (PGP) Support disabled
PGP_SUPPORT = False
# Globally Default encoding mode set to Quoted Printable.
charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8')
@ -439,6 +451,12 @@ class NotifyEmail(NotifyBase):
'type': 'string',
'map_to': 'smtp_host',
},
'pgp': {
'name': _('PGP Encryption'),
'type': 'bool',
'map_to': 'use_pgp',
'default': False,
},
'mode': {
'name': _('Secure Mode'),
'type': 'choice:string',
@ -463,7 +481,7 @@ class NotifyEmail(NotifyBase):
def __init__(self, smtp_host=None, from_addr=None, secure_mode=None,
targets=None, cc=None, bcc=None, reply_to=None, headers=None,
**kwargs):
use_pgp=None, **kwargs):
"""
Initialize Email Object
@ -500,6 +518,17 @@ class NotifyEmail(NotifyBase):
self.smtp_host = \
smtp_host if isinstance(smtp_host, str) else ''
# pgp hash
self.pgp_public_keys = {}
self.use_pgp = use_pgp if not None \
else self.template_args['pgp']['default']
if self.use_pgp and not PGP_SUPPORT:
self.logger.warning(
'PGP Support is not available on this installation; '
'ask admin to install PGPy')
# Now detect secure mode
if secure_mode:
self.secure_mode = None \
@ -831,6 +860,12 @@ class NotifyEmail(NotifyBase):
mixed.attach(app)
base = mixed
if self.use_pgp:
# Apply our encryption
encrypted_content = self.pgp_encrypt_message(base.as_string())
if encrypted_content:
base = MIMEText(encrypted_content, "plain")
# Apply any provided custom headers
for k, v in self.headers.items():
base[k] = Header(v, self._get_charset(v))
@ -901,20 +936,21 @@ class NotifyEmail(NotifyBase):
message.to_addrs,
message.body)
self.logger.info(
f'Sent Email notification to "{message.recipient}".')
self.logger.info('Sent Email to %s', message.recipient)
except (SocketError, smtplib.SMTPException, RuntimeError) as e:
self.logger.warning(
f'Sending email to "{message.recipient}" failed. '
f'Reason: {e}')
'Sending email to "%s" failed.', message.recipient)
self.logger.debug(f'Socket Exception: {e}')
# Mark as failure
has_error = True
except (SocketError, smtplib.SMTPException, RuntimeError) as e:
self.logger.warning(
f'Connection error while submitting email to {self.smtp_host}.'
f' Reason: {e}')
'Connection error while submitting email to "%s"',
self.smtp_host)
self.logger.debug(f'Socket Exception: {e}')
# Mark as failure
has_error = True
@ -924,15 +960,224 @@ class NotifyEmail(NotifyBase):
if socket is not None: # pragma: no branch
socket.quit()
# Reduce our dictionary (eliminate expired keys if any)
self.pgp_public_keys = {
key: value for key, value in self.pgp_public_keys.items()
if value['expires'] > datetime.now(timezone.utc)}
return not has_error
def pgp_generate_keys(self, path=None):
"""
Generates a set of keys based on email configured
"""
if path is None:
if self.store.mode == PersistentStoreMode.MEMORY:
# Not possible - no write permissions
return False
# Set our path
path = self.store.path
try:
# Create a new RSA key pair with 2048-bit strength
key = pgpy.PGPKey.new(
pgpy.constants.PubKeyAlgorithm.RSAEncryptOrSign, 2048)
except NameError:
# PGPy not installed
self.logger.debug('PGPy not installed; ignoring PGP file: %s')
return False
# Prepare our uid
name, email = self.names[self.from_addr[1]], self.from_addr[1]
uid = pgpy.PGPUID.new(name, email=email)
# Filenames
file_prefix = email.split('@')[0].lower()
pub_path = os.path.join(path, f'{file_prefix}-pub.asc')
prv_path = os.path.join(path, f'{file_prefix}-prv.asc')
# Add the user ID to the key
key.add_uid(uid, usage={
pgpy.constants.KeyFlags.Sign,
pgpy.constants.KeyFlags.EncryptCommunications},
hashes=[pgpy.constants.HashAlgorithm.SHA256],
ciphers=[pgpy.constants.SymmetricKeyAlgorithm.AES256],
compression=[pgpy.constants.CompressionAlgorithm.ZLIB])
try:
# Write our keys to disk
with open(pub_path, 'w') as f:
f.write(str(key.pubkey))
except OSError as e:
self.logger.warning('Error writing PGP file %s', pub_path)
self.logger.debug(f'I/O Exception: {e}')
# Cleanup
try:
os.unlink(pub_path)
self.logger.trace('Removed %s', pub_path)
except OSError:
pass
try:
with open(prv_path, 'w') as f:
f.write(str(key))
except OSError as e:
self.logger.warning('Error writing PGP file %s', prv_path)
self.logger.debug(f'I/O Exception: {e}')
try:
os.unlink(pub_path)
self.logger.trace('Removed %s', pub_path)
except OSError:
pass
try:
os.unlink(prv_path)
self.logger.trace('Removed %s', prv_path)
except OSError:
pass
return False
self.logger.info(
'Wrote PGP Keys for %s/%s',
os.path.dirname(pub_path),
os.path.basename(pub_path))
return True
@property
def pgp_fnames(self):
"""
Returns a list of filenames worth scanning for
"""
fnames = [
'pgp-public.asc',
'pgp-pub.asc',
'public.asc',
'pub.asc',
]
# Prepare our key files:
email = self.from_addr[1]
_entry = email.split('@')[0].lower()
if _entry not in fnames:
fnames.insert(0, f'{_entry}-pub.asc')
# Lowercase email (Highest Priority)
_entry = email.lower()
if _entry not in fnames:
fnames.insert(0, f'{_entry}-pub.asc')
return fnames
def pgp_public_key(self, path=None):
"""
Opens a spcified pgp public file and returns the key from it which
is used to encrypt the message
"""
if path is None:
path = next(
(os.path.join(self.store.path, fname)
for fname in self.pgp_fnames
if os.path.isfile(os.path.join(self.store.path, fname))),
None)
if not path:
if self.pgp_generate_keys(path=self.store.path):
path = next(
(os.path.join(self.store.path, fname)
for fname in self.pgp_fnames
if os.path.isfile(
os.path.join(self.store.path, fname))), None)
if path:
# We should get a hit now
return self.pgp_public_key(path=path)
self.logger.warning('No PGP Public Key could be loaded')
return None
if not isinstance(path, str):
raise AttributeError(
'Invalid path to PGP Public Key specified: %s: %s',
type(path), str(path))
# Persistent storage key:
ps_key = hashlib.sha1(
os.path.abspath(path).encode('utf-8')).hexdigest()
if ps_key in self.pgp_public_keys:
# Take an early exit
return self.pgp_public_keys[ps_key]['public_key']
try:
with open(path, 'r') as key_file:
public_key, _ = pgpy.PGPKey.from_blob(key_file.read())
except NameError:
# PGPy not installed
self.logger.debug(
'PGPy not installed; skipping PGP support: %s', path)
return None
except FileNotFoundError:
# Generate keys
self.logger.debug('PGP Public Key file not found: %s', path)
return None
except OSError as e:
self.logger.warning('Error accessing PGP Public Key file %s', path)
self.logger.debug(f'I/O Exception: {e}')
return None
self.store.set(ps_key, public_key, expires=86400)
self.pgp_public_keys[ps_key] = {
'public_key': public_key,
'expires':
datetime.now(timezone.utc) + timedelta(seconds=86400)
}
return public_key
# Encrypt message using the recipient's public key
def pgp_encrypt_message(self, message, path=None):
"""
If provided a path to a pgp-key, content is encrypted
"""
# Acquire our key
public_key = self.pgp_public_key(path=path)
if not public_key:
# Encryption not possible
return False
try:
message_object = pgpy.PGPMessage.new(message)
encrypted_message = public_key.encrypt(message_object)
return str(encrypted_message)
except NameError:
# PGPy not installed
self.logger.debug('PGPy not installed; Skipping PGP encryption')
return None
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define an URL parameters
params = {}
params = {
'pgp': 'yes' if self.use_pgp else 'no',
}
# Append our headers into our parameters
params.update({'+{}'.format(k): v for k, v in self.headers.items()})
@ -1044,7 +1289,7 @@ class NotifyEmail(NotifyBase):
"""
return (
self.secure_protocol if self.secure else self.protocol,
self.user, self.password, self.host,
self.user, self.password, self.host, self.smtp_host,
self.port if self.port
else SECURE_MODES[self.secure_mode]['default_port'],
)
@ -1053,8 +1298,7 @@ class NotifyEmail(NotifyBase):
"""
Returns the number of targets associated with this notification
"""
targets = len(self.targets)
return targets if targets > 0 else 1
return len(self.targets) if self.targets else 1
@staticmethod
def parse_url(url):
@ -1086,6 +1330,11 @@ class NotifyEmail(NotifyBase):
# value if invalid; we'll attempt to figure this out later on
results['host'] = ''
# Get PGP Flag
results['use_pgp'] = \
parse_bool(results['qsd'].get(
'pgp', NotifyEmail.template_args['pgp']['default']))
# The From address is a must; either through the use of templates
# from= entry and/or merging the user and hostname together, this
# must be calculated or parse_url will fail.

View File

@ -29,6 +29,7 @@
import logging
import pytest
import os
import sys
import re
from unittest import mock
from inspect import cleandoc
@ -40,6 +41,7 @@ from apprise import NotifyType, NotifyBase
from apprise import Apprise
from apprise import AttachBase
from apprise import AppriseAsset
from apprise import PersistentStoreMode
from apprise.config import ConfigBase
from apprise import AppriseAttachment
from apprise.plugins.email import NotifyEmail
@ -48,7 +50,6 @@ from apprise.plugins import email as NotifyEmailModule
# Disable logging for a cleaner testing output
logging.disable(logging.CRITICAL)
# Attachment Directory
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
@ -130,6 +131,10 @@ TEST_URLS = (
('mailtos://%20@domain.com?user=admin@mail-domain.com', {
'instance': NotifyEmail,
}),
('mailtos://%20@domain.com?user=admin@mail-domain.com?pgp=yes', {
# Test pgp flag
'instance': NotifyEmail,
}),
('mailtos://user:pass@nuxref.com:567/l2g@nuxref.com', {
'instance': NotifyEmail,
}),
@ -2049,3 +2054,72 @@ def test_plugin_email_by_ipaddr_1113(mock_smtp, mock_smtp_ssl):
assert email.smtp_host == '10.0.0.195'
assert email.port == 25
assert email.targets == [(False, 'alerts@example.com')]
@pytest.mark.skipif('pgpy' not in sys.modules, reason="Requires PGPy")
@mock.patch('smtplib.SMTP_SSL')
@mock.patch('smtplib.SMTP')
def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir):
"""
NotifyEmail() PGP Tests
"""
# Initialize our email (no from name)
obj = Apprise.instantiate('mailto://user:pass@nuxref.com?pgp=yes')
# Test our names
fnames = obj.pgp_fnames
assert isinstance(fnames, list)
# login is pgp
obj = Apprise.instantiate('mailto://pgp:pass@nuxref.com?pgp=yes')
# Test our names
fnames = obj.pgp_fnames
assert isinstance(fnames, list)
# login is pgp
obj = Apprise.instantiate('mailto://chris:pass@nuxref.com?pgp=yes')
fnames = obj.pgp_fnames
assert isinstance(fnames, list)
# Attempt to generate keys
obj = Apprise.instantiate('mailto://chris:pass@nuxref.com?pgp=yes')
# We're in memory mode
assert obj.store.mode == PersistentStoreMode.MEMORY
assert obj.pgp_generate_keys() is False
tmpdir1 = tmpdir.mkdir('tmp01')
# However explicitly setting a path works
assert obj.pgp_generate_keys(str(tmpdir1)) is True
tmpdir2 = tmpdir.mkdir('tmp02')
asset = AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH,
storage_path=str(tmpdir2),
)
obj = Apprise.instantiate(
'mailto://chris:pass@nuxref.com?pgp=yes', asset=asset)
assert obj.store.mode == PersistentStoreMode.FLUSH
assert obj.pgp_generate_keys() is True
# We do this again but even when we do a requisition for a public key
# it will generate a new pair or keys for us once it detects we don't
# have any
tmpdir3 = tmpdir.mkdir('tmp03')
asset = AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH,
storage_path=str(tmpdir3),
)
obj = Apprise.instantiate(
'mailto://chris:pass@nuxref.com?pgp=yes', asset=asset)
assert obj.store.mode == PersistentStoreMode.FLUSH
# We'll have a public key object to encrypt with
assert obj.pgp_public_key() is not None
encrypted = obj.pgp_encrypt_message("hello world")
assert encrypted.startswith('-----BEGIN PGP MESSAGE-----')
assert encrypted.rstrip().endswith('-----END PGP MESSAGE-----')