diff --git a/.gitignore b/.gitignore index 162dbae9..5ff0d848 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ docs/doctrees/* .directory *.swp .idea +.tox/** # ignore demo attachments that user might have added helpdesk/attachments/ diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 3c8da6b2..00000000 --- a/.travis.yml +++ /dev/null @@ -1,26 +0,0 @@ -language: python -dist: focal # use LTS 20.04 - -python: - - "3.6" - - "3.7" - - "3.8" - -env: - - DJANGO=2.2.23 - - DJANGO=3.1.11 - - DJANGO=3.2.3 - -install: - - pip install -q Django==$DJANGO - - pip install -q -r requirements.txt - - pip install -q -r requirements-testing.txt - -before_script: - - "pycodestyle --exclude=migrations --ignore=E501,W503,W504 helpdesk" - -script: - - coverage run --source='.' quicktest.py helpdesk - -after_success: - - codecov diff --git a/Makefile b/Makefile index e71c5e2a..f34a0de5 100644 --- a/Makefile +++ b/Makefile @@ -56,6 +56,22 @@ test: mkdir -p var $(PIP) install -e .[test] $(TOX) + rm -rf var + + + +#: format - Run the PEP8 formatter. +.PHONY: format +format: + autopep8 --exit-code --global-config .flake8 helpdesk + isort --line-length=120 --src helpdesk . + + +#: checkformat - checks formatting against configured format specifications for the project. +.PHONY: checkformat +checkformat: + flake8 helpdesk --count --show-source --statistics --exit-zero --max-complexity=20 + isort --line-length=120 --src helpdesk . --check #: documentation - Build documentation (Sphinx, README, ...). diff --git a/README.rst b/README.rst index d7cb3237..9cfbd9a5 100644 --- a/README.rst +++ b/README.rst @@ -69,9 +69,30 @@ Django project. For further installation information see `docs/install.html` and `docs/configuration.html` +Developer Environment +--------------------- + +Follow these steps to set up your development environment to contribute to helpdesk: + - install a virtual environment + - using virtualenv from the helpdesk base folder do:: + virtualenv .venv && source .venv/bin/activate + + - install the requirements for development:: + pip install -r requirements.txt -r requirements-dev.txt + +To see option for the Makefile run: `make` + +The project enforces a standardized formatting in the CI/CD pipeline. To ensure you have the correct formatting run:: + make checkformat + +To auto format any code use this:: + make format + Testing ------- +From the command line you can run the tests using: `make test` + See `quicktest.py` for usage details. Upgrading from previous versions diff --git a/helpdesk/email.py b/helpdesk/email.py index 1a0b7d50..85dd07d4 100644 --- a/helpdesk/email.py +++ b/helpdesk/email.py @@ -7,6 +7,7 @@ See LICENSE for details. # import base64 + from bs4 import BeautifulSoup from datetime import timedelta from django.conf import settings as django_settings @@ -17,9 +18,11 @@ from django.db.models import Q from django.utils import encoding, timezone from django.utils.translation import gettext as _ import email +from email.message import Message from email.utils import getaddresses from email_reply_parser import EmailReplyParser from helpdesk import settings +from helpdesk.exceptions import DeleteIgnoredTicketException, IgnoreTicketException from helpdesk.lib import process_attachments, safe_template_context from helpdesk.models import FollowUp, IgnoreEmail, Queue, Ticket import imaplib @@ -34,6 +37,7 @@ import ssl import sys from time import ctime import typing +from typing import List, Tuple # import User model, which may be a custom model @@ -135,16 +139,23 @@ def pop3_sync(q, logger, server): else: full_message = encoding.force_str( "\n".join(raw_content), errors='replace') - ticket = object_from_message( - message=full_message, queue=q, logger=logger) - - if ticket: - server.dele(msgNum) - logger.info( - "Successfully processed message %s, deleted from POP3 server" % msgNum) - else: + try: + ticket = object_from_message(message=full_message, queue=q, logger=logger) + except IgnoreTicketException: logger.warn( - "Message %s was not successfully processed, and will be left on POP3 server" % msgNum) + "Message %s was ignored and will be left on POP3 server" % msgNum) + except DeleteIgnoredTicketException: + logger.warn( + "Message %s was ignored and deleted from POP3 server" % msgNum) + server.dele(msgNum) + else: + if ticket: + server.dele(msgNum) + logger.info( + "Successfully processed message %s, deleted from POP3 server" % msgNum) + else: + logger.warn( + "Message %s was not successfully processed, and will be left on POP3 server" % msgNum) server.quit() @@ -186,17 +197,23 @@ def imap_sync(q, logger, server): data = server.fetch(num, '(RFC822)')[1] full_message = encoding.force_str(data[0][1], errors='replace') try: - ticket = object_from_message( - message=full_message, queue=q, logger=logger) - except TypeError: - ticket = None # hotfix. Need to work out WHY. - if ticket: + ticket = object_from_message(message=full_message, queue=q, logger=logger) + except IgnoreTicketException: + logger.warn("Message %s was ignored and will be left on IMAP server" % num) + except DeleteIgnoredTicketException: server.store(num, '+FLAGS', '\\Deleted') - logger.info( - "Successfully processed message %s, deleted from IMAP server" % num) + logger.warn("Message %s was ignored and deleted from IMAP server" % num) + except TypeError as te: + # Log the error with stacktrace to help identify what went wrong + logger.error(f"Unexpected error processing message: {te}", exc_info=True) else: - logger.warn( - "Message %s was not successfully processed, and will be left on IMAP server" % num) + if ticket: + server.store(num, '+FLAGS', '\\Deleted') + logger.info( + "Successfully processed message %s, deleted from IMAP server" % num) + else: + logger.warn( + "Message %s was not successfully processed, and will be left on IMAP server" % num) except imaplib.IMAP4.error: logger.error( "IMAP retrieve failed. Is the folder '%s' spelled correctly, and does it exist on the server?", @@ -282,22 +299,28 @@ def process_queue(q, logger): logger.info("Processing message %d" % i) with open(m, 'r') as f: full_message = encoding.force_str(f.read(), errors='replace') - ticket = object_from_message( - message=full_message, queue=q, logger=logger) - if ticket: - logger.info( - "Successfully processed message %d, ticket/comment created.", i) try: - # delete message file if ticket was successful + ticket = object_from_message(message=full_message, queue=q, logger=logger) + except IgnoreTicketException: + logger.warn("Message %d was ignored and will be left in local directory", i) + except DeleteIgnoredTicketException: os.unlink(m) - except OSError as e: - logger.error( - "Unable to delete message %d (%s).", i, str(e)) + logger.warn("Message %d was ignored and deleted local directory", i) else: - logger.info("Successfully deleted message %d.", i) - else: - logger.warn( - "Message %d was not successfully processed, and will be left in local directory", i) + if ticket: + logger.info( + "Successfully processed message %d, ticket/comment created.", i) + try: + # delete message file if ticket was successful + os.unlink(m) + except OSError as e: + logger.error( + "Unable to delete message %d (%s).", i, str(e)) + else: + logger.info("Successfully deleted message %d.", i) + else: + logger.warn( + "Message %d was not successfully processed, and will be left in local directory", i) def decodeUnknown(charset, string): @@ -574,37 +597,124 @@ def get_email_body_from_part_payload(part) -> str: ) +def attempt_body_extract_from_html(message: str) -> str: + mail = BeautifulSoup(str(message), "html.parser") + beautiful_body = mail.find('body') + body = None + full_body = None + if beautiful_body: + try: + body = beautiful_body.text + full_body = body + except AttributeError: + pass + if not body: + body = "" + return body, full_body + + +def extract_part_data( + part: Message, + counter: int, + ticket_id: int, + files: List, + logger: logging.Logger +) -> Tuple[str, str]: + name = part.get_filename() + if name: + name = email.utils.collapse_rfc2231_value(name) + part_body = None + part_full_body = None + if part.get_content_maintype() == 'text' and name is None: + if part.get_content_subtype() == 'plain': + part_body = part.get_payload(decode=True) + # https://github.com/django-helpdesk/django-helpdesk/issues/732 + if part['Content-Transfer-Encoding'] == '8bit' and part.get_content_charset() == 'utf-8': + part_body = part_body.decode('unicode_escape') + part_body = decodeUnknown(part.get_content_charset(), part_body) + # have to use django_settings here so overwriting it works in tests + # the default value is False anyway + if ticket_id is None and getattr(django_settings, 'HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL', False): + # first message in thread, we save full body to avoid + # losing forwards and things like that + part_full_body = get_body_from_fragments(part_body) + part_body = EmailReplyParser.parse_reply(part_body) + else: + # second and other reply, save only first part of the + # message + part_body = EmailReplyParser.parse_reply(part_body) + part_full_body = part_body + # workaround to get unicode text out rather than escaped text + part_body = get_encoded_body(part_body) + logger.debug("Discovered plain text MIME part") + else: + email_body = get_email_body_from_part_payload(part) + + if not part_body and not part_full_body: + # no text has been parsed so far - try such deep parsing + # for some messages + altered_body = email_body.replace( + "

", "

\n").replace("{email_body}" + + payload = ( + '' + '' + '' + '' + '%s' + '' + ) % email_body + files.append( + SimpleUploadedFile( + _("email_html_body.html"), payload.encode("utf-8"), 'text/html') + ) + logger.debug("Discovered HTML MIME part") + else: + if not name: + ext = mimetypes.guess_extension(part.get_content_type()) + name = f"part-{counter}{ext}" + else: + name = f"part-{counter}_{name}" + + files.append(SimpleUploadedFile(name, part.get_payload(decode=True), mimetypes.guess_type(name)[0])) + logger.debug("Found MIME attachment %s", name) + return part_body, part_full_body + + def object_from_message(message: str, queue: Queue, logger: logging.Logger ) -> Ticket: - # 'message' must be an RFC822 formatted message. - message = email.message_from_string(message) + # 'message' must be an RFC822 formatted message to correctly parse. + message_obj = email.message_from_string(message) - subject = message.get('subject', _('Comment from e-mail')) + subject = message_obj.get('subject', _('Comment from e-mail')) subject = decode_mail_headers( - decodeUnknown(message.get_charset(), subject)) + decodeUnknown(message_obj.get_charset(), subject)) for affix in STRIPPED_SUBJECT_STRINGS: subject = subject.replace(affix, "") subject = subject.strip() - sender = message.get('from', _('Unknown Sender')) - sender = decode_mail_headers(decodeUnknown(message.get_charset(), sender)) - - # to address bug #832, we wrap all the text in front of the email address in - # double quotes by using replace() on the email string. Then, - # take first item of list, second item of tuple is the actual email address. - # Note that the replace won't work on just an email with no real name, - # but the getaddresses() function seems to be able to handle just unclosed quotes - # correctly. Not ideal, but this seems to work for now. - sender_email = email.utils.getaddresses( - ['\"' + sender.replace('<', '\" <')])[0][1] + # TODO: Should really be assigning a properly formatted fake email. + # Check if anything relies on this being a "real name" formatted string if no sender is found on message_obj. + # Also not sure it should be accepting emails from unknown senders + sender_email = _('Unknown Sender') + sender_hdr = message_obj.get('from') + if sender_hdr: + # Parse the header which extracts the first email address in the list if more than one + # The parseaddr method returns a tuple in the form + # Only need the actual email address from the tuple not the "real name" + # Since the spec requires that all email addresses are ASCII, they will not be encoded + sender_email = email.utils.parseaddr(sender_hdr)[1] for ignore in IgnoreEmail.objects.filter(Q(queues=queue) | Q(queues__isnull=True)): if ignore.test(sender_email): - # By returning 'False' the message will be kept in the mailbox, - # and the 'True' will cause the message to be deleted. - return not ignore.keep_in_mailbox + raise IgnoreTicketException() if ignore.keep_in_mailbox else DeleteIgnoredTicketException() ticket_id: typing.Optional[int] = get_ticket_id_from_subject_slug( queue.slug, @@ -617,108 +727,23 @@ def object_from_message(message: str, counter = 0 files = [] - for part in message.walk(): + for part in message_obj.walk(): if part.get_content_maintype() == 'multipart': continue - - name = part.get_param("name") - if name: - name = email.utils.collapse_rfc2231_value(name) - - if part.get_content_maintype() == 'text' and name is None: - if part.get_content_subtype() == 'plain': - body = part.get_payload(decode=True) - # https://github.com/django-helpdesk/django-helpdesk/issues/732 - if part['Content-Transfer-Encoding'] == '8bit' and part.get_content_charset() == 'utf-8': - body = body.decode('unicode_escape') - body = decodeUnknown(part.get_content_charset(), body) - # have to use django_settings here so overwritting it works in tests - # the default value is False anyway - if ticket_id is None and getattr(django_settings, 'HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL', False): - # first message in thread, we save full body to avoid - # losing forwards and things like that - full_body = get_body_from_fragments(body) - body = EmailReplyParser.parse_reply(body) - else: - # second and other reply, save only first part of the - # message - body = EmailReplyParser.parse_reply(body) - full_body = body - # workaround to get unicode text out rather than escaped text - body = get_encoded_body(body) - logger.debug("Discovered plain text MIME part") - else: - email_body = get_email_body_from_part_payload(part) - - if not body and not full_body: - # no text has been parsed so far - try such deep parsing - # for some messages - altered_body = email_body.replace( - "

", "

\n").replace("{email_body}" - - payload = ( - '' - '' - '' - '' - '%s' - '' - ) % email_body - files.append( - SimpleUploadedFile( - _("email_html_body.html"), payload.encode("utf-8"), 'text/html') - ) - logger.debug("Discovered HTML MIME part") - else: - if not name: - ext = mimetypes.guess_extension(part.get_content_type()) - name = "part-%i%s" % (counter, ext) - else: - name = ("part-%i_" % counter) + name - - # # FIXME: this code gets the paylods, then does something with it and then completely ignores it - # # writing the part.get_payload(decode=True) instead; and then the payload variable is - # # replaced by some dict later. - # # the `payloadToWrite` has been also ignored so was commented - # payload = part.get_payload() - # if isinstance(payload, list): - # payload = payload.pop().as_string() - # # payloadToWrite = payload - # # check version of python to ensure use of only the correct error type - # non_b64_err = TypeError - # try: - # logger.debug("Try to base64 decode the attachment payload") - # # payloadToWrite = base64.decodebytes(payload) - # except non_b64_err: - # logger.debug("Payload was not base64 encoded, using raw bytes") - # # payloadToWrite = payload - files.append(SimpleUploadedFile(name, part.get_payload( - decode=True), mimetypes.guess_type(name)[0])) - logger.debug("Found MIME attachment %s" % name) - + # See email.message_obj.Message.get_filename() + part_body, part_full_body = extract_part_data(part, counter, ticket_id, files, logger) + if part_body: + body = part_body + full_body = part_full_body counter += 1 if not body: - mail = BeautifulSoup(str(message), "html.parser") - beautiful_body = mail.find('body') - if beautiful_body: - try: - body = beautiful_body.text - full_body = body - except AttributeError: - pass - if not body: - body = "" + body, full_body = attempt_body_extract_from_html(message_obj) - add_file_if_always_save_incoming_email_message(files, message) + add_file_if_always_save_incoming_email_message(files, message_obj) - smtp_priority = message.get('priority', '') - smtp_importance = message.get('importance', '') + smtp_priority = message_obj.get('priority', '') + smtp_importance = message_obj.get('importance', '') high_priority_types = {'high', 'important', '1', 'urgent'} priority = 2 if high_priority_types & { smtp_priority, smtp_importance} else 3 @@ -733,4 +758,4 @@ def object_from_message(message: str, 'files': files, } - return create_object_from_email_message(message, ticket_id, payload, files, logger=logger) + return create_object_from_email_message(message_obj, ticket_id, payload, files, logger=logger) diff --git a/helpdesk/exceptions.py b/helpdesk/exceptions.py new file mode 100644 index 00000000..4dbf59a5 --- /dev/null +++ b/helpdesk/exceptions.py @@ -0,0 +1,13 @@ +class IgnoreTicketException(Exception): + """ + Raised when an email message is received from a sender who is marked to be ignored + """ + pass + + +class DeleteIgnoredTicketException(Exception): + """ + Raised when an email message is received from a sender who is marked to be ignored + and the record is tagged to delete the email from the inbox + """ + pass diff --git a/helpdesk/tests/test_get_email.py b/helpdesk/tests/test_get_email.py index 89afa95d..a3ae7aee 100644 --- a/helpdesk/tests/test_get_email.py +++ b/helpdesk/tests/test_get_email.py @@ -1,13 +1,18 @@ # -*- coding: utf-8 -*- + from django.contrib.auth.hashers import make_password from django.contrib.auth.models import User +from django.core.files.uploadedfile import SimpleUploadedFile from django.core.management import call_command from django.shortcuts import get_object_or_404 from django.test import override_settings, TestCase import helpdesk.email +from helpdesk.email import extract_part_data, object_from_message +from helpdesk.exceptions import DeleteIgnoredTicketException, IgnoreTicketException from helpdesk.management.commands.get_email import Command -from helpdesk.models import FollowUp, FollowUpAttachment, Queue, Ticket, TicketCC +from helpdesk.models import FollowUp, FollowUpAttachment, IgnoreEmail, Queue, Ticket, TicketCC +from helpdesk.tests import utils import itertools import logging import os @@ -15,6 +20,7 @@ from shutil import rmtree import six import sys from tempfile import mkdtemp +import typing from unittest import mock @@ -42,7 +48,7 @@ class GetEmailCommonTests(TestCase): with mock.patch.object(Command, 'handle', return_value=None) as mocked_handle: call_command('get_email', quiet=quiet_test_value) mocked_handle.assert_called_once() - for args, kwargs in mocked_handle.call_args_list: + for _, kwargs in mocked_handle.call_args_list: self.assertEqual(quiet_test_value, (kwargs['quiet'])) def test_email_with_blank_body_and_attachment(self): @@ -132,7 +138,41 @@ class GetEmailCommonTests(TestCase): assert "Hello there!" in FollowUp.objects.filter( ticket=ticket).first().comment + def test_will_delete_ignored_email(self): + """ + Tests if an email will be ignored if configured to do so and throws the correct exception + to ensure the email is deleted + """ + message, from_meta, _ = utils.generate_text_email(locale="es_ES") + ignore = IgnoreEmail(name="Test Ignore", email_address=from_meta[1], keep_in_mailbox=False) + ignore.save() + with self.assertRaises(DeleteIgnoredTicketException): + object_from_message(message.as_string(), self.queue_public, self.logger); + def test_will_not_delete_ignored_email(self): + """ + Tests if an email will be ignored if configured to do so and throws the correct exception + to ensure the email is NOT deleted + """ + message, from_meta, _ = utils.generate_text_email(locale="es_ES") + ignore = IgnoreEmail(name="Test Ignore", email_address=from_meta[1], keep_in_mailbox=True) + ignore.save() + with self.assertRaises(IgnoreTicketException): + object_from_message(message.as_string(), self.queue_public, self.logger); + + def test_utf8_filename_attachment(self): + """ + Tests if an attachment correctly sent with a UTF8 filename in disposition is extracted correctly + """ + filename = "TeléfonoMañana.txt" + part = utils.generate_file_mime_part(locale="es_ES", filename=filename) + files: typing.List[SimpleUploadedFile] = [] + extract_part_data(part, counter=1, ticket_id="tst1", files=files, logger=self.logger) + sent_file: SimpleUploadedFile = files[0] + # The extractor prepends a part identifier so compare the ending + self.assertTrue(sent_file.name.endswith(filename), f"Filename extracted does not match: {sent_file.name}") + + class GetEmailParametricTemplate(object): """TestCase that checks basic email functionality across methods and socks configs.""" @@ -258,12 +298,12 @@ class GetEmailParametricTemplate(object): """Tests correctly decoding mail headers when a comma is encoded into UTF-8. See bug report #832.""" - # example email text from Django docs: - # https://docs.djangoproject.com/en/1.10/ref/unicode/ - test_email_from = "Bernard-Bouissières, Benjamin " + # Create the from using standard RFC required formats + # Override the last_name to ensure we get a non-ascii character in it + test_email_from_meta = utils.generate_email_address("fr_FR", last_name_override="Bouissières") test_email_subject = "Commas in From lines" test_email_body = "Testing commas in from email UTF-8." - test_email = "To: helpdesk@example.com\nFrom: " + test_email_from + \ + test_email = "To: helpdesk@example.com\nFrom: " + test_email_from_meta[0] + \ "\nSubject: " + test_email_subject + "\n\n" + test_email_body test_mail_len = len(test_email) @@ -333,13 +373,13 @@ class GetEmailParametricTemplate(object): ticket1 = get_object_or_404(Ticket, pk=1) self.assertEqual(ticket1.ticket_for_url, "QQ-%s" % ticket1.id) - self.assertEqual(ticket1.submitter_email, 'bbb@example.com') + self.assertEqual(ticket1.submitter_email, test_email_from_meta[1]) self.assertEqual(ticket1.title, test_email_subject) self.assertEqual(ticket1.description, test_email_body) ticket2 = get_object_or_404(Ticket, pk=2) self.assertEqual(ticket2.ticket_for_url, "QQ-%s" % ticket2.id) - self.assertEqual(ticket2.submitter_email, 'bbb@example.com') + self.assertEqual(ticket2.submitter_email, test_email_from_meta[1]) self.assertEqual(ticket2.title, test_email_subject) self.assertEqual(ticket2.description, test_email_body) diff --git a/helpdesk/tests/utils.py b/helpdesk/tests/utils.py new file mode 100644 index 00000000..00b7e6a5 --- /dev/null +++ b/helpdesk/tests/utils.py @@ -0,0 +1,270 @@ +"""UItility functions facilitate making unit testing easier and less brittle.""" + + +from PIL import Image +import email +from email import encoders +from email.message import Message +from email.mime.base import MIMEBase +from email.mime.image import MIMEImage +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +import factory +import faker +from io import BytesIO +from numpy.random import randint +import random +import re +import string +import typing +from typing import Any, Optional, Tuple +import unicodedata + + +def strip_accents(text): + """ + Strip accents from input String. (only works on Pythin 3 + + :param text: The input string. + :type text: String. + + :returns: The processed String. + :rtype: String. + """ + text = unicodedata.normalize('NFD', text) + text = text.encode('ascii', 'ignore') + text = text.decode("utf-8") + return str(text) + + +def text_to_id(text): + """ + Convert input text to id. + + :param text: The input string. + :type text: String. + + :returns: The processed String. + :rtype: String. + """ + text = strip_accents(text.lower()) + text = re.sub('[ ]+', '_', text) + text = re.sub('[^0-9a-zA-Z_-]', '', text) + return text + + +def get_random_string(length: int=16) -> str: + return "".join( + [random.choice(string.ascii_letters + string.digits) for _ in range(length)] + ) + + +def generate_random_image(image_format, array_dims): + """ + Creates an image from a random array. + + :param image_format: An image format (PNG or JPEG). + :param array_dims: A tuple with array dimensions. + + :returns: A byte string with encoded image + :rtype: bytes + """ + image_bytes = randint(low=0, high=255, size=array_dims, dtype='uint8') + io = BytesIO() + image_pil = Image.fromarray(image_bytes) + image_pil.save(io, image_format, subsampling=0, quality=100) + return io.getvalue() + + +def get_random_image(image_format: str="PNG", size: int=5): + """ + Returns a random image. + + Args: + image_format: An image format (PNG or JPEG). + + Returns: + A string with encoded image + """ + return generate_random_image(image_format, (size, size, 3)) + + +def get_fake(provider: str, locale: str = "en_US", min_length: int = 5) -> Any: + """ + Generates a random string, float, integer etc based on provider + Provider can be "text', 'sentence', "word" + e.g. `get_fake('name')` ==> 'Buzz Aldrin' + """ + string = factory.Faker(provider).evaluate({}, None, {'locale': locale,}) + while len(string) < min_length: + string += factory.Faker(provider).evaluate({}, None, {'locale': locale,}) + return string + + +def get_fake_html(locale: str = "en_US", wrap_in_body_tag=True) -> Any: + """ + Generates a random string, float, integer etc based on provider + Provider can be "text', 'sentence', + e.g. `get_fake('name')` ==> 'Buzz Aldrin' + """ + html = factory.Faker("sentence").evaluate({}, None, {'locale': locale,}) + for _ in range(0,4): + html += "
  • " + factory.Faker("sentence").evaluate({}, None, {'locale': locale,}) + "
  • " + for _ in range(0,4): + html += "

    " + factory.Faker("text").evaluate({}, None, {'locale': locale,}) + return f"{html}" if wrap_in_body_tag else html + +def generate_email_address( + locale: str="en_US", + use_short_email: bool=False, + real_name_format: Optional[str]="{last_name}, {first_name}", + last_name_override: Optional[str]=None) -> Tuple[str, str, str, str]: + ''' + Generate an RFC 2822 email address + + :param locale: change this to generate locale specific names + :param use_short_email: defaults to false. If true then does not include real name in email address + :param real_name_format: pass a different format if different than "{last_name}, {first_name}" + :param last_name_override: override the fake name if you want some special characters in the last name + :returns , , , Message: + """ + + :param locale: change this to generate locale specific file name and attachment content + :param filename: pass a file name if you want to specify a specific name otherwise a random name will be generated + """ + part = MIMEBase('application', 'octet-stream') + part.set_payload(get_fake("text", locale=locale, min_length=1024)) + encoders.encode_base64(part) + if not filename: + filename = get_fake("word", locale=locale, min_length=8) + ".txt" + part.add_header('Content-Disposition', "attachment; filename= %s" % filename) + return part + +def generate_image_mime_part(locale: str="en_US",imagename: str = None) -> Message: + """ + + :param locale: change this to generate locale specific file name and attachment content + :param filename: pass a file name if you want to specify a specific name otherwise a random name will be generated + """ + part = MIMEImage(generate_random_image(image_format="JPEG")) + part.set_payload(get_fake("text", locale=locale, min_length=1024)) + encoders.encode_base64(part) + if not imagename: + imagename = get_fake("word", locale=locale, min_length=8) + ".jpg" + part.add_header('Content-Disposition', "attachment; filename= %s" % imagename) + return part + +def generate_email_list(address_cnt: int = 3, + locale: str="en_US", + use_short_email: bool=False + ) -> str: + """ + Generates a list of email addresses formatted for email headers on a Mime part + + :param address_cnt: the number of email addresses to string together + :param locale: change this to generate locale specific "real names" and subject + :param use_short_email: produces a email address without "real name" if True + + """ + email_address_list = [generate_email_address(locale, use_short_email=use_short_email)[0] for _ in range(0, address_cnt)] + return ",".join(email_address_list) + +def add_simple_email_headers(message: Message, locale: str="en_US", + use_short_email: bool=False + ) -> typing.Tuple[typing.Tuple[str, str], typing.Tuple[str, str]]: + """ + Adds the key email headers to a Mime part + + :param message: the Mime part to add headers to + :param locale: change this to generate locale specific "real names" and subject + :param use_short_email: produces a "To" or "From" that is only the email address if True + + """ + to_meta = generate_email_address(locale, use_short_email=use_short_email) + from_meta = generate_email_address(locale, use_short_email=use_short_email) + + message['Subject'] = get_fake("sentence", locale=locale) + message['From'] = from_meta[0] + message['To'] = to_meta[0] + return from_meta, to_meta + +def generate_mime_part(locale: str="en_US", + part_type: str="plain", + use_short_email: bool=False + ) -> typing.Tuple[Message, typing.Tuple[str, str], typing.Tuple[str, str]]: + """ + Generates amime part of the sepecified type + + :param locale: change this to generate locale specific strings + :param text_type: options are plain, html, image (attachment), file (attachment) + :param use_short_email: produces a "To" or "From" that is only the email address if True + """ + if "plain" == part_type: + body = get_fake("text", locale=locale, min_length=1024) + msg = MIMEText(body) + elif "html" == part_type: + body = get_fake_html(locale=locale, wrap_in_body_tag=True) + msg = MIMEText(body) + elif "file" == part_type: + msg = generate_file_mime_part(locale=locale) + msg = MIMEText(body) + elif "image" == part_type: + msg = generate_image_mime_part(locale=locale) + msg = MIMEText(body) + else: + raise Exception("Mime part not implemented: " + part_type) + return msg + +def generate_multipart_email(locale: str="en_US", + type_list: typing.List[str]=["plain", "html", "attachment"], + use_short_email: bool=False + ) -> typing.Tuple[Message, typing.Tuple[str, str], typing.Tuple[str, str]]: + """ + Generates an email including headers with the defined multiparts + + :param locale: + :param type_list: options are plain, html, image (attachment), file (attachment) + :param use_short_email: produces a "To" or "From" that is only the email address if True + """ + msg = MIMEMultipart() + for part_type in type_list: + msg.append(generate_mime_part(locale=locale, part_type=part_type, use_short_email=use_short_email)) + from_meta, to_meta = add_simple_email_headers(msg, locale=locale, use_short_email=use_short_email) + return msg, from_meta, to_meta + +def generate_text_email(locale: str="en_US", + use_short_email: bool=False + ) -> typing.Tuple[Message, typing.Tuple[str, str], typing.Tuple[str, str]]: + """ + Generates an email including headers + """ + body = get_fake("text", locale=locale, min_length=1024) + msg = MIMEText(body) + from_meta, to_meta = add_simple_email_headers(msg, locale=locale, use_short_email=use_short_email) + return msg, from_meta, to_meta + +def generate_html_email(locale: str="en_US", + use_short_email: bool=False + ) -> typing.Tuple[Message, typing.Tuple[str, str], typing.Tuple[str, str]]: + """ + Generates an email including headers + """ + body = get_fake_html(locale=locale) + msg = MIMEText(body) + from_meta, to_meta = add_simple_email_headers(msg, locale=locale, use_short_email=use_short_email) + return msg, from_meta, to_meta diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 00000000..2501f30c --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,5 @@ +tox +autopep8 +flake8 +pycodestyle +isort diff --git a/requirements-testing.txt b/requirements-testing.txt index 12adeae7..80872424 100644 --- a/requirements-testing.txt +++ b/requirements-testing.txt @@ -7,3 +7,6 @@ pbr mock freezegun isort +numpy +factory_boy +faker \ No newline at end of file