diff --git a/README.rst b/README.rst index 16a25755..49bcab6e 100644 --- a/README.rst +++ b/README.rst @@ -57,7 +57,7 @@ Installation `django-helpdesk` requires: * Python 3.8+ -* Django 3.2 LTS highly recommended (early adopters may test Django 4) +* Django 3.2 LTS or Django 4.* You can quickly install the latest stable version of `django-helpdesk` app via `pip`:: @@ -78,6 +78,9 @@ Developer Environment --------------------- Follow these steps to set up your development environment to contribute to helpdesk: + - check out the helpdesk app to your local file system:: + git clone https://github.com/django-helpdesk/django-helpdesk.git + - install a virtual environment - using virtualenv from the helpdesk base folder do:: virtualenv .venv && source .venv/bin/activate @@ -85,6 +88,12 @@ Follow these steps to set up your development environment to contribute to helpd - install the requirements for development:: pip install -r requirements.txt -r requirements-dev.txt + - install the requirements for testing as well:: + pip install -r requirements.txt -r requirements-dev.txt -r requirements-testing.txt + +To reactivate a VENV just run: + source .venv/bin/activate + To see option for the Makefile run: `make` The project enforces a standardized formatting in the CI/CD pipeline. To ensure you have the correct formatting run:: diff --git a/docs/settings.rst b/docs/settings.rst index d8f5f7af..874afa34 100644 --- a/docs/settings.rst +++ b/docs/settings.rst @@ -37,7 +37,6 @@ If you want to override the default settings for your users, create ``HELPDESK_D 'tickets_per_page': 25 } - Generic Options --------------- These changes are visible throughout django-helpdesk @@ -86,6 +85,10 @@ These changes are visible throughout django-helpdesk **Default:** ``HELPDESK_MAX_EMAIL_ATTACHMENT_SIZE = 512000`` +- **VALID_EXTENSIONS** Valid extensions for file types that can be attached to tickets + + **Default:** ``VALID_EXTENSIONS = ['.txt', '.asc', '.htm', '.html', '.pdf', '.doc', '.docx', '.odt', '.jpg', '.png', '.eml'] + - **QUEUE_EMAIL_BOX_UPDATE_ONLY** Only process mail with a valid tracking ID; all other mail will be ignored instead of creating a new ticket. **Default:** ``QUEUE_EMAIL_BOX_UPDATE_ONLY = False`` diff --git a/helpdesk/email.py b/helpdesk/email.py index 68aa06b6..08e14f96 100644 --- a/helpdesk/email.py +++ b/helpdesk/email.py @@ -6,8 +6,6 @@ See LICENSE for details. """ # import base64 - - from bs4 import BeautifulSoup from datetime import timedelta from django.conf import settings as django_settings @@ -18,7 +16,8 @@ from django.db.models import Q from django.utils import encoding, timezone from django.utils.translation import gettext as _ import email -from email.message import Message +from email import policy +from email.message import EmailMessage, MIMEPart from email.utils import getaddresses from email_reply_parser import EmailReplyParser from helpdesk import settings @@ -39,7 +38,7 @@ import ssl import sys from time import ctime import typing -from typing import List, Tuple +from typing import List # import User model, which may be a custom model @@ -53,6 +52,9 @@ STRIPPED_SUBJECT_STRINGS = [ "Automatic reply: ", ] +# Allow a custom default attached email name for the HTML formatted email if one is found +HTML_EMAIL_ATTACHMENT_FILENAME = _("email_html_body.html") + def process_email(quiet=False): for q in Queue.objects.filter( @@ -75,7 +77,6 @@ def process_email(quiet=False): logger.propagate = False if quiet: logger.propagate = False # do not propagate to root logger that would log to console - # Log messages to specific file only if the queue has it configured if (q.logging_type in logging_types) and q.logging_dir: # if it's enabled and the dir is set log_file_handler = logging.FileHandler( @@ -141,7 +142,7 @@ def pop3_sync(q, logger, server): full_message = encoding.force_str( "\n".join(raw_content), errors='replace') try: - ticket = object_from_message(message=full_message, queue=q, logger=logger) + ticket = extract_email_metadata(message=full_message, queue=q, logger=logger) except IgnoreTicketException: logger.warn( "Message %s was ignored and will be left on POP3 server" % msgNum) @@ -198,7 +199,7 @@ def imap_sync(q, logger, server): data = server.fetch(num, '(RFC822)')[1] full_message = encoding.force_str(data[0][1], errors='replace') try: - ticket = object_from_message(message=full_message, queue=q, logger=logger) + ticket = extract_email_metadata(message=full_message, queue=q, logger=logger) except IgnoreTicketException: logger.warn("Message %s was ignored and will be left on IMAP server" % num) except DeleteIgnoredTicketException: @@ -252,13 +253,11 @@ def imap_oauth_sync(q, logger, server): ) server.debug = settings.HELPDESK_IMAP_DEBUG_LEVEL - # TODO: Perhaps store the authentication string template externally? Settings? Queue Table? server.authenticate( "XOAUTH2", lambda x: f"user={q.email_box_user}\x01auth=Bearer {token['access_token']}\x01\x01".encode(), ) - # Select the Inbound Mailbox folder server.select(q.email_box_imap_folder) @@ -285,7 +284,7 @@ def imap_oauth_sync(q, logger, server): full_message = encoding.force_str(data[0][1], errors='replace') try: - ticket = object_from_message(message=full_message, queue=q, logger=logger) + ticket = extract_email_metadata(message=full_message, queue=q, logger=logger) except IgnoreTicketException as itex: logger.warn(f"Message {num} was ignored. {itex}") @@ -312,7 +311,6 @@ def imap_oauth_sync(q, logger, server): "IMAP retrieve failed. Is the folder '%s' spelled correctly, and does it exist on the server?", q.email_box_imap_folder ) - # Purged Flagged Messages & Logout server.expunge() server.close() @@ -405,7 +403,7 @@ def process_queue(q, logger): with open(m, 'r') as f: full_message = encoding.force_str(f.read(), errors='replace') try: - ticket = object_from_message(message=full_message, queue=q, logger=logger) + ticket = extract_email_metadata(message=full_message, queue=q, logger=logger) except IgnoreTicketException: logger.warn("Message %d was ignored and will be left in local directory", i) except DeleteIgnoredTicketException: @@ -429,7 +427,7 @@ def process_queue(q, logger): def decodeUnknown(charset, string): - if type(string) is not str: + if string and not isinstance(string, str): if not charset: try: return str(string, encoding='utf-8', errors='replace') @@ -468,11 +466,10 @@ def is_autoreply(message): def create_ticket_cc(ticket, cc_list): if not cc_list: return [] - # Local import to deal with non-defined / circular reference problem - from helpdesk.views.staff import subscribe_to_ticket_updates, User new_ticket_ccs = [] + from helpdesk.views.staff import subscribe_to_ticket_updates, User for __, cced_email in cc_list: cced_email = cced_email.strip() @@ -538,7 +535,6 @@ def create_object_from_email_message(message, ticket_id, payload, files, logger) ticket.merged_to.ticket) # Use the ticket in which it was merged to for next operations ticket = ticket.merged_to - # New issue, create a new instance if ticket is None: if not settings.QUEUE_EMAIL_BOX_UPDATE_ONLY: @@ -555,7 +551,6 @@ def create_object_from_email_message(message, ticket_id, payload, files, logger) (ticket.queue.slug, ticket.id)) new = True - # Old issue being re-opened elif ticket.status == Ticket.CLOSED_STATUS: ticket.status = Ticket.REOPENED_STATUS @@ -723,96 +718,222 @@ def attempt_body_extract_from_html(message: str) -> str: return body, full_body -def extract_part_data( - part: Message, +def mime_content_to_string(part: EmailMessage,) -> str: + ''' + Extract the content from the MIME body part + :param part: the MIME part to extract the content from + ''' + content_bytes = part.get_payload(decode=True) + charset = part.get_content_charset() + # The default for MIME email is 7bit which requires special decoding to utf-8 so make sure + # we handle the decoding correctly + if part['Content-Transfer-Encoding'] in [None, '8bit', '7bit'] and (charset == 'utf-8' or charset is None): + charset = "unicode_escape" + content = decodeUnknown(charset, content_bytes) + return content + + +def parse_email_content(mime_content: str, is_extract_full_email_msg: bool) -> str: + if is_extract_full_email_msg: + # Take the full content including encapsulated "forwarded" and "reply" sections + return mime_content + else: + # Just get the primary part of the email and drop off any text below the actual response text + return EmailReplyParser.parse_reply(mime_content) + + +def extract_email_message_content( + part: MIMEPart, + files: List, + include_chained_msgs: bool, +) -> (str, str): + ''' + Uses the get_body() method of the email package to extract the email message content. + If there is an HTML version of the email message content then it is stored as an attachment. + If there is a plain text part then that is used for storing the email content aginst the ticket. + Otherwise if there is just an HTML part then the HTML is parsed to extract a simple text message. + There is special handling for the case when a multipart/related part holds the message content when + there are multiple attachments to the email. + :param part: the base email MIME part to be searched + :param files: any MIME parts to be attached are added to this list + :param include_chained_msgs: flag to indicate if the entire email message content including past + replies must be extracted + ''' + message_part: MIMEPart = part.get_body() + parent_part: MIMEPart = part + content_type = message_part.get_content_type() + # Handle the possibility of a related part formatted email + if "multipart/related" == content_type: + # We want the actual message text so try again on the related MIME part + parent_part = message_part + message_part = message_part.get_body(preferencelist=["html", "plain",]) + content_type = message_part.get_content_type() + mime_content = None + formatted_body = None # Retain the original content by using a secondary variable if the HTML needs wrapping + if "text/html" == content_type: + # add the HTML message as an attachment wrapping if necessary + mime_content = mime_content_to_string(message_part) + if "{mime_content}" + if "\ + {mime_content if formatted_body is None else formatted_body}" + files.append( + SimpleUploadedFile( + HTML_EMAIL_ATTACHMENT_FILENAME, + (mime_content if formatted_body is None else formatted_body).encode("utf-8"), 'text/html', + ) + ) + # Try to get a plain part message + plain_message_part = parent_part.get_body(preferencelist=["plain",]) + if plain_message_part: + # Replace mime_content with the plain text part content + mime_content = mime_content_to_string(plain_message_part) + message_part = plain_message_part + content_type = "text/plain" + else: + # Try to constitute the HTML response as plain text + mime_content, _x = attempt_body_extract_from_html( + mime_content if formatted_body is None else formatted_body) + else: + # Is either text/plain or some random content-type so just decode the part content and store as is + mime_content = mime_content_to_string(message_part) + # We should now have the mime content + filtered_body = parse_email_content(mime_content, include_chained_msgs) + if not filtered_body or "" == filtered_body.strip(): + # A unit test that has a different HTML content to plain text which seems an invalid case as email + # tools should retain the HTML to be consistent with the plain text but manage this as a special case + # Try to constitute the HTML response as plain text + if formatted_body: + filtered_body, _x = attempt_body_extract_from_html(formatted_body) + else: + filtered_body = mime_content + # Only need the full message if the message_body excludes the chained messages + return filtered_body, mime_content + + +def process_as_attachment( + part: MIMEPart, counter: int, - ticket_id: int, files: List, logger: logging.Logger -) -> Tuple[str, str]: +): name = part.get_filename() if name: - name = email.utils.collapse_rfc2231_value(name) - part_body = None - part_full_body = None - if part.get_content_maintype() == 'text' and name is None: - if part.get_content_subtype() == 'plain': - part_body = part.get_payload(decode=True) - # https://github.com/django-helpdesk/django-helpdesk/issues/732 - if part['Content-Transfer-Encoding'] == '8bit' and part.get_content_charset() == 'utf-8': - part_body = part_body.decode('unicode_escape') - part_body = decodeUnknown(part.get_content_charset(), part_body) - # have to use django_settings here so overwriting it works in tests - # the default value is False anyway - if ticket_id is None and getattr(django_settings, 'HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL', False): - # first message in thread, we save full body to avoid - # losing forwards and things like that - part_full_body = get_body_from_fragments(part_body) - part_body = EmailReplyParser.parse_reply(part_body) - else: - # second and other reply, save only first part of the - # message - part_body = EmailReplyParser.parse_reply(part_body) - part_full_body = part_body - # workaround to get unicode text out rather than escaped text - part_body = get_encoded_body(part_body) - logger.debug("Discovered plain text MIME part") - else: - email_body = get_email_body_from_part_payload(part) - - if not part_body and not part_full_body: - # no text has been parsed so far - try such deep parsing - # for some messages - altered_body = email_body.replace( - "

", "

\n").replace("{email_body}" - - payload = ( - '' - '' - '' - '' - '%s' - '' - ) % email_body - files.append( - SimpleUploadedFile( - _("email_html_body.html"), payload.encode("utf-8"), 'text/html') - ) - logger.debug("Discovered HTML MIME part") + name = f"part-{counter}_{email.utils.collapse_rfc2231_value(name)}" else: - if not name: - ext = mimetypes.guess_extension(part.get_content_type()) - name = f"part-{counter}{ext}" - else: - name = f"part-{counter}_{name}" - payload = part.as_string() if part.is_multipart() else part.get_payload(decode=True) - files.append(SimpleUploadedFile(name, payload, mimetypes.guess_type(name)[0])) - logger.debug("Found MIME attachment %s", name) - return part_body, part_full_body + ext = mimetypes.guess_extension(part.get_content_type()) + name = f"part-{counter}{ext}" + # Extract payload accounting for attached multiparts + payload_bytes = part.as_bytes() if part.is_multipart() else part.get_payload(decode=True) + files.append(SimpleUploadedFile(name, payload_bytes, mimetypes.guess_type(name)[0])) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Processed MIME as attachment: %s", name) + return -def object_from_message(message: str, - queue: Queue, - logger: logging.Logger - ) -> Ticket: - # 'message' must be an RFC822 formatted message to correctly parse. - message_obj = email.message_from_string(message) - - subject = message_obj.get('subject', _('Comment from e-mail')) +def extract_email_subject(email_msg: EmailMessage,) -> str: + subject = email_msg.get('subject', _('Comment from e-mail')) subject = decode_mail_headers( - decodeUnknown(message_obj.get_charset(), subject)) + decodeUnknown(email_msg.get_charset(), subject)) for affix in STRIPPED_SUBJECT_STRINGS: subject = subject.replace(affix, "") - subject = subject.strip() + return subject.strip() + + +def extract_attachments( + target_part: MIMEPart, + files: List, + logger: logging.Logger, + counter: int = 1, + content_parts_excluded: bool = False, +) -> (int, bool): + ''' + If the MIME part is a multipart and not identified as "inline" or "attachment" then + iterate over the sub parts recursively. + Otherwise extract MIME part content and add as an attachment. + It will recursively descend as appropriate ensuring that all parts not part of the message content + are added to the list of files to be attached. To cater for the possibility of text/plain and text/html parts + that are further down in the multipart hierarchy than the ones that ones meant to provide that content, + iterators are selectively used. + :param part: the email MIME part to be processed + :param files: any MIME part content or MIME parts to be attached are added to this list + :param logger: the logger to use for this MIME part processing + :param counter: the count of MIME parts added as attachment + :param content_parts_excluded: the MIME part(s) that provided the message content have been excluded + :returns the count of mime parts added as attachments and a boolean if the content parts have been excluded + ''' + content_type = target_part.get_content_type() + content_maintype = target_part.get_content_maintype() + if "multipart" == content_maintype and target_part.get_content_disposition() not in ['inline', 'attachment']: + # Cycle through all MIME parts in the email extracting the attachments that were not part of the message body + # If this is a "related" multipart then we can use the message part excluder iterator directly + if "multipart/related" == content_type: + if content_parts_excluded: + # This should really never happen in a properly constructed email message but... + logger.warn( + "WARNING! Content type MIME parts have been excluded but a multipart/related has been encountered.\ + There may be missing information in attachments.") + else: + content_parts_excluded = True + # Use the iterator that automatically excludes message content parts + for part in target_part.iter_attachments(): + counter, content_parts_excluded = extract_attachments( + part, files, logger, counter, content_parts_excluded) + # The iterator must be different depending on whether we have already excluded message content parts + else: + # Content part might be 1 or 2 parts but will be at same level so track finding at least 1 + content_part_detected = False + for part in target_part.iter_parts(): + if not content_parts_excluded and part.get_content_type() in ["text/plain", "text/html"]: + content_part_detected = True + continue + # Recurse into the part to process embedded parts + counter, content_parts_excluded = extract_attachments( + part, files, logger, counter, content_parts_excluded) + # If we have found 1 or more content parts then flag that the content parts have been ommitted + # to ensure that other text/* parts are attached + if content_part_detected: + content_parts_excluded = True + else: + process_as_attachment(target_part, counter, files, logger) + counter = counter + 1 + return (counter, content_parts_excluded) + + +def extract_email_metadata(message: str, + queue: Queue, + logger: logging.Logger + ) -> Ticket: + ''' + Extracts the text/plain mime part if there is one as the ticket description and + stores the text/html part as an attachment if it is present. + If no text/plain part is present then it will try to use the text/html part if + it is present as the ticket description by removing the HTML formatting. + If neither a text/plain or text/html is present then it will use the first text/* + MIME part that it finds as the ticket description. + By default it will always take only the actual message and drop any chained messages + from replies. + The HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL settings can force the entire message to be + stored in the ticket if it is a new ticket by setting it to True. + In this scenario, if it is a reply that is a forwarded message with no actual message, + then the description will be sourced from the text/html part and the forwarded message + will be in the FollowUp record associated with the ticket. + It will iterate over every MIME part and store all MIME parts as attachments apart + from the text/plain part. + There may be a case for trying to exclude repeated signature images by checking if an + attachment of the same name already exists as an attachment on the ticket but that is + not implemented. + :param message: the raw email message received + :param queue: the queue that the message is assigned to + :param logger: the logger to be used + ''' + # 'message' must be an RFC822 formatted message to correctly parse. + # NBot sure why but policy explicitly set to default is required for any messages with attachments in them + message_obj: EmailMessage = email.message_from_string(message, EmailMessage, policy=policy.default) + + subject = extract_email_subject(message_obj) - # TODO: Should really be assigning a properly formatted fake email. - # Check if anything relies on this being a "real name" formatted string if no sender is found on message_obj. - # Also not sure it should be accepting emails from unknown senders sender_email = _('Unknown Sender') sender_hdr = message_obj.get('from') if sender_hdr: @@ -831,26 +952,24 @@ def object_from_message(message: str, subject, logger ) - - body = None - full_body = None - counter = 0 files = [] - - for part in message_obj.walk(): - if part.get_content_maintype() == 'multipart': - continue - # See email.message_obj.Message.get_filename() - part_body, part_full_body = extract_part_data(part, counter, ticket_id, files, logger) - if part_body: - body = part_body - full_body = part_full_body - counter += 1 - - if not body: - body, full_body = attempt_body_extract_from_html(message_obj) - - add_file_if_always_save_incoming_email_message(files, message_obj) + # first message in thread, we save full body to avoid losing forwards and things like that + include_chained_msgs = True if ticket_id is None and getattr( + django_settings, 'HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL', False) else False + filtered_body, full_body = extract_email_message_content(message_obj, files, include_chained_msgs) + # If the base part is not a multipart then it will have already been processed as the vbody content so + # no need to process attachments + if "multipart" == message_obj.get_content_maintype(): + # Find and attach all other parts or part contents as attachments + counter, content_parts_excluded = extract_attachments(message_obj, files, logger) + if not content_parts_excluded: + # Unexpected situation and may mean there is a hole in the email processing logic + logger.warning( + "Failed to exclude email content when parsing all MIME parts in the multipart.\ + Verify that there were no text/* parts containing message content.") + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Email parsed and %s attachments were found and attached.", counter) + add_file_if_always_save_incoming_email_message(files, message) smtp_priority = message_obj.get('priority', '') smtp_importance = message_obj.get('importance', '') @@ -859,8 +978,8 @@ def object_from_message(message: str, smtp_priority, smtp_importance} else 3 payload = { - 'body': body, - 'full_body': full_body or body, + 'body': filtered_body, + 'full_body': full_body, 'subject': subject, 'queue': queue, 'sender_email': sender_email, diff --git a/helpdesk/lib.py b/helpdesk/lib.py index d265963b..ac0d46bc 100644 --- a/helpdesk/lib.py +++ b/helpdesk/lib.py @@ -186,11 +186,11 @@ def format_time_spent(time_spent): def convert_value(value): """ Convert date/time data type to known fixed format string """ - if type(value) == datetime: + if type(value) is datetime: return value.strftime(CUSTOMFIELD_DATETIME_FORMAT) - elif type(value) == date: + elif type(value) is date: return value.strftime(CUSTOMFIELD_DATE_FORMAT) - elif type(value) == time: + elif type(value) is time: return value.strftime(CUSTOMFIELD_TIME_FORMAT) else: return value diff --git a/helpdesk/settings.py b/helpdesk/settings.py index 7dc7ac5c..100112ff 100644 --- a/helpdesk/settings.py +++ b/helpdesk/settings.py @@ -229,7 +229,7 @@ HELPDESK_ENABLE_PER_QUEUE_STAFF_PERMISSION = getattr( # use https in the email links HELPDESK_USE_HTTPS_IN_EMAIL_LINK = getattr( - settings, 'HELPDESK_USE_HTTPS_IN_EMAIL_LINK', False) + settings, 'HELPDESK_USE_HTTPS_IN_EMAIL_LINK', settings.SECURE_SSL_REDIRECT) HELPDESK_TEAMS_MODEL = getattr( settings, 'HELPDESK_TEAMS_MODEL', 'pinax_teams.Team') diff --git a/helpdesk/templated_email.py b/helpdesk/templated_email.py index 5c98cdc7..e68489a9 100644 --- a/helpdesk/templated_email.py +++ b/helpdesk/templated_email.py @@ -97,7 +97,7 @@ def send_templated_mail(template_name, if isinstance(recipients, str): if recipients.find(','): recipients = recipients.split(',') - elif type(recipients) != list: + elif type(recipients) is not list: recipients = [recipients] msg = EmailMultiAlternatives(subject_part, text_part, diff --git a/helpdesk/tests/test_get_email.py b/helpdesk/tests/test_get_email.py index f7d37800..abd346ba 100644 --- a/helpdesk/tests/test_get_email.py +++ b/helpdesk/tests/test_get_email.py @@ -7,11 +7,14 @@ from django.core.files.uploadedfile import SimpleUploadedFile from django.core.management import call_command from django.shortcuts import get_object_or_404 from django.test import override_settings, TestCase +from email.mime.message import MIMEMessage +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText import helpdesk.email -from helpdesk.email import extract_part_data, object_from_message +from helpdesk.email import extract_email_metadata, process_as_attachment from helpdesk.exceptions import DeleteIgnoredTicketException, IgnoreTicketException from helpdesk.management.commands.get_email import Command -from helpdesk.models import Attachment, FollowUp, FollowUpAttachment, IgnoreEmail, Queue, Ticket, TicketCC +from helpdesk.models import FollowUp, FollowUpAttachment, IgnoreEmail, Queue, Ticket, TicketCC from helpdesk.tests import utils import itertools import logging @@ -59,7 +62,7 @@ class GetEmailCommonTests(TestCase): """ with open(os.path.join(THIS_DIR, "test_files/blank-body-with-attachment.eml"), encoding="utf-8") as fd: test_email = fd.read() - ticket = helpdesk.email.object_from_message( + ticket = helpdesk.email.extract_email_metadata( test_email, self.queue_public, self.logger) # title got truncated because of max_lengh of the model.title field assert ticket.title == ( @@ -67,7 +70,7 @@ class GetEmailCommonTests(TestCase): "ooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo" "ooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo..." ) - self.assertEqual(ticket.description, "") + self.assertEqual(ticket.description.strip(), "", msg=ticket.description) def test_email_with_quoted_printable_body(self): """ @@ -75,7 +78,7 @@ class GetEmailCommonTests(TestCase): """ with open(os.path.join(THIS_DIR, "test_files/quoted_printable.eml"), encoding="utf-8") as fd: test_email = fd.read() - ticket = helpdesk.email.object_from_message( + ticket = helpdesk.email.extract_email_metadata( test_email, self.queue_public, self.logger) self.assertEqual(ticket.title, "Český test") self.assertEqual(ticket.description, @@ -96,24 +99,29 @@ class GetEmailCommonTests(TestCase): """ with open(os.path.join(THIS_DIR, "test_files/all-special-chars.eml"), encoding="utf-8") as fd: test_email = fd.read() - ticket = helpdesk.email.object_from_message( + ticket = helpdesk.email.extract_email_metadata( test_email, self.queue_public, self.logger) self.assertEqual(ticket.title, "Testovácí email") self.assertEqual(ticket.description, "íářčšáíéřášč") - @override_settings(HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL=True) + @override_settings(HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL=False) def test_email_with_utf_8_non_decodable_sequences(self): """ Tests that emails with utf-8 non-decodable sequences are parsed correctly - The message is fowarded as well + The forwarded part of the message must still be in the ticket description if + HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL is set to True. + Otherwise it should only be in the Followup records Comment field + NOTE: This test weirdly tests having completely different content in the HTML part + than the PLAIN part so not sure about the validity of this as a test but + the code does support this oddity anyway for backwards compatibility """ with open(os.path.join(THIS_DIR, "test_files/utf-nondecodable.eml"), encoding="utf-8") as fd: test_email = fd.read() - ticket = helpdesk.email.object_from_message( + ticket = helpdesk.email.extract_email_metadata( test_email, self.queue_public, self.logger) self.assertEqual( ticket.title, "Fwd: Cyklozaměstnavatel - změna vyhodnocení") - self.assertIn("prosazuje lepší", ticket.description) + self.assertIn("prosazuje lepší", ticket.description, "Missing text \"prosazuje lepší\" in description: %s" % ticket.description) followups = FollowUp.objects.filter(ticket=ticket) followup = followups[0] attachments = FollowUpAttachment.objects.filter(followup=followup) @@ -122,21 +130,40 @@ class GetEmailCommonTests(TestCase): attachment.file.read().decode("utf-8")) @override_settings(HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL=True) - def test_email_with_forwarded_message(self): + def test_email_with_forwarded_message_just_message_stored(self): """ - Forwarded message of that format must be still attached correctly + The forwarded part of the message must still be in the ticket description if + HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL is set to True. + Otherwise it should only be in the Followup records Comment field """ with open(os.path.join(THIS_DIR, "test_files/forwarded-message.eml"), encoding="utf-8") as fd: test_email = fd.read() - ticket = helpdesk.email.object_from_message( + ticket = helpdesk.email.extract_email_metadata( test_email, self.queue_public, self.logger) self.assertEqual( ticket.title, "Test with original message from GitHub") self.assertIn("This is email body", ticket.description) - assert "Hello there!" not in ticket.description, ticket.description - assert FollowUp.objects.filter(ticket=ticket).count() == 1 - assert "Hello there!" in FollowUp.objects.filter( - ticket=ticket).first().comment + self.assertTrue("Hello there!" in ticket.description, ticket.description) + self.assertTrue(FollowUp.objects.filter(ticket=ticket).count() == 1) + + @override_settings(HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL=False) + def test_email_with_forwarded_message_chained_messages_stored(self): + """ + The forwarded part of the message must still be in the ticket description if + HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL is set to True. + Otherwise it should only be in the Followup records Comment field + """ + with open(os.path.join(THIS_DIR, "test_files/forwarded-message.eml"), encoding="utf-8") as fd: + test_email = fd.read() + ticket = helpdesk.email.extract_email_metadata( + test_email, self.queue_public, self.logger) + self.assertEqual( + ticket.title, "Test with original message from GitHub") + self.assertIn("This is email body", ticket.description) + self.assertTrue("Hello there!" not in ticket.description, ticket.description) + followups = FollowUp.objects.filter(ticket=ticket).values("comment") + self.assertTrue(followups.count() == 1) + self.assertTrue("Hello there!" in followups[0]["comment"], followups[0]["comment"]) def test_will_delete_ignored_email(self): """ @@ -147,7 +174,7 @@ class GetEmailCommonTests(TestCase): ignore = IgnoreEmail(name="Test Ignore", email_address=from_meta[1], keep_in_mailbox=False) ignore.save() with self.assertRaises(DeleteIgnoredTicketException): - object_from_message(message.as_string(), self.queue_public, self.logger) + extract_email_metadata(message.as_string(), self.queue_public, self.logger) def test_will_not_delete_ignored_email(self): """ @@ -158,7 +185,7 @@ class GetEmailCommonTests(TestCase): ignore = IgnoreEmail(name="Test Ignore", email_address=from_meta[1], keep_in_mailbox=True) ignore.save() with self.assertRaises(IgnoreTicketException): - object_from_message(message.as_string(), self.queue_public, self.logger) + extract_email_metadata(message.as_string(), self.queue_public, self.logger) def test_utf8_filename_attachment(self): """ @@ -167,7 +194,7 @@ class GetEmailCommonTests(TestCase): filename = "TeléfonoMañana.txt" part = utils.generate_file_mime_part(locale="es_ES", filename=filename) files: typing.List[SimpleUploadedFile] = [] - extract_part_data(part, counter=1, ticket_id="tst1", files=files, logger=self.logger) + process_as_attachment(part, counter=1, files=files, logger=self.logger) sent_file: SimpleUploadedFile = files[0] # The extractor prepends a part identifier so compare the ending self.assertTrue(sent_file.name.endswith(filename), f"Filename extracted does not match: {sent_file.name}") @@ -179,17 +206,17 @@ class GetEmailCommonTests(TestCase): """ message, _, _ = utils.generate_multipart_email(type_list=['plain', 'image']) - self.assertEqual(len(mail.outbox), 0) + self.assertEqual(len(mail.outbox), 0) # @UndefinedVariable with self.assertLogs(logger='helpdesk', level='ERROR') as cm: - object_from_message(message.as_string(), self.queue_public, self.logger) + extract_email_metadata(message.as_string(), self.queue_public, self.logger) self.assertIn( "ERROR:helpdesk:['Unsupported file extension: .jpg']", cm.output ) - self.assertEqual(len(mail.outbox), 1) + self.assertEqual(len(mail.outbox), 1) # @UndefinedVariable self.assertEqual(f'[test-1] {message.get("subject")} (Opened)', mail.outbox[0].subject) def test_multiple_attachments(self): @@ -198,11 +225,11 @@ class GetEmailCommonTests(TestCase): """ message, _, _ = utils.generate_multipart_email(type_list=['plain', 'file', 'image']) - self.assertEqual(len(mail.outbox), 0) + self.assertEqual(len(mail.outbox), 0) # @UndefinedVariable - object_from_message(message.as_string(), self.queue_public, self.logger) + extract_email_metadata(message.as_string(), self.queue_public, self.logger) - self.assertEqual(len(mail.outbox), 1) + self.assertEqual(len(mail.outbox), 1) # @UndefinedVariable self.assertEqual(f'[test-1] {message.get("subject")} (Opened)', mail.outbox[0].subject) ticket = Ticket.objects.get() @@ -216,10 +243,10 @@ class GetEmailCommonTests(TestCase): """ message, _, _ = utils.generate_multipart_email(type_list=['plain', 'image', 'file', 'image']) - self.assertEqual(len(mail.outbox), 0) + self.assertEqual(len(mail.outbox), 0) # @UndefinedVariable with self.assertLogs(logger='helpdesk', level='ERROR') as cm: - object_from_message(message.as_string(), self.queue_public, self.logger) + extract_email_metadata(message.as_string(), self.queue_public, self.logger) self.assertIn( "ERROR:helpdesk:['Unsupported file extension: .jpg']", @@ -240,18 +267,78 @@ class GetEmailCommonTests(TestCase): att_content = email_attachment.as_string() message.attach(utils.generate_file_mime_part(filename=att_filename, content=att_content)) - object_from_message(message.as_string(), self.queue_public, self.logger) - - self.assertEqual(len(mail.outbox), 1) + extract_email_metadata(message.as_string(), self.queue_public, self.logger) + self.assertEqual(len(mail.outbox), 1) # @UndefinedVariable self.assertEqual(f'[test-1] {message.get("subject")} (Opened)', mail.outbox[0].subject) ticket = Ticket.objects.get() followup = ticket.followup_set.get() - att_retrieved: Attachment = followup.followupattachment_set.get() - self.assertTrue(att_retrieved.filename.endswith(att_filename), "Filename of attached multipart not detected: %s" % (att_retrieved.filename)) - with att_retrieved.file.open('r') as f: - retrieved_content = f.read() - self.assertEquals(att_content, retrieved_content, "Retrieved attachment content different to original :\n\n%s\n\n%s" % (att_content, retrieved_content)) + + for att_retrieved in followup.followupattachment_set.all(): + if (helpdesk.email.HTML_EMAIL_ATTACHMENT_FILENAME == att_retrieved.filename): + # Ignore the HTML formatted conntent of the email that is attached + continue + self.assertTrue(att_retrieved.filename.endswith(att_filename), "Filename of attached multipart not detected: %s" % (att_retrieved.filename)) + with att_retrieved.file.open('r') as f: + retrieved_content = f.read() + self.assertEquals(att_content, retrieved_content, "Retrieved attachment content different to original :\n\n%s\n\n%s" % (att_content, retrieved_content)) + + def test_email_with_inline_and_multipart_as_attachments(self): + """ + Test a multipart email with an inline attachment and a multipart email attachment to the email + """ + inline_att_filename = 'inline_attachment.jpg' + email_att_filename = 'email_attachment2.eml' + # Create the inline attachment for the email using an ID so we can reference it in the email body + inline_image_id = "liTE5er6Dbnd" + inline_attachment = utils.generate_image_mime_part(locale="en_US", imagename=inline_att_filename, disposition_primary_type="inline") + inline_attachment.add_header('X-Attachment-Id', inline_image_id) + inline_attachment.add_header('Content-ID', '<' + inline_image_id + '>') + # Create the actual email with its plain and HTML parts + alt_email_message = MIMEMultipart("alternative") + # Create the plain and HTML that will reference the inline attachment + plain_body = "Test with inline image: \n[image: " + inline_att_filename + "]\n\n" + plain_msg = MIMEText(plain_body) + alt_email_message.attach(plain_msg) + html_body = '
Test with inline image: 3D="'
' + html_msg = MIMEText(html_body, "html") + alt_email_message.attach(html_msg) + # Create the email to be attached and attach that as well + email_to_be_attached, _, _ = utils.generate_multipart_email(type_list=['plain', 'html']) + email_as_attachment = MIMEMessage(email_to_be_attached) + email_as_attachment.add_header('Content-Disposition', 'attachment', filename=email_att_filename) + # Now create the base multipart and attach all the other parts to it + related_message = MIMEMultipart("related") + related_message.attach(alt_email_message) + related_message.attach(inline_attachment) + base_message = MIMEMultipart("mixed") + base_message.attach(related_message) + base_message.attach(email_as_attachment) + utils.add_simple_email_headers(base_message, locale="en_US", use_short_email=True) + # Now send the part to the email workflow + extract_email_metadata(base_message.as_string(), self.queue_public, self.logger) + + self.assertEqual(len(mail.outbox), 1) # @UndefinedVariable + self.assertEqual(f'[test-1] {base_message.get("subject")} (Opened)', mail.outbox[0].subject) + + ticket = Ticket.objects.get() + followup = ticket.followup_set.get() + # Check both the inline and attachment are stored as attached files + inline_found = False + email_attachment_found = False + for att_retrieved in followup.followupattachment_set.all(): + if (helpdesk.email.HTML_EMAIL_ATTACHMENT_FILENAME == att_retrieved.filename): + # Ignore the HTML formatted content of the email that is attached + continue + if att_retrieved.filename.endswith(inline_att_filename): + inline_found = True + elif att_retrieved.filename.endswith(email_att_filename): + email_attachment_found = True + else: + print(f"\n\n%%%%%% {att_retrieved}") + self.assertTrue(False, "Unexpected file in ticket attachments: %s" % att_retrieved.filename) + self.assertTrue(email_attachment_found, "Email attachment file not found ticket attachments: %s" % (email_att_filename)) + self.assertTrue(inline_found, "Inline file not found in email: %s" % (inline_att_filename)) class GetEmailParametricTemplate(object): @@ -424,7 +511,6 @@ class GetEmailParametricTemplate(object): def test_commas_in_mail_headers(self): """Tests correctly decoding mail headers when a comma is encoded into UTF-8. See bug report #832.""" - # Create the from using standard RFC required formats # Override the last_name to ensure we get a non-ascii character in it test_email_from_meta = utils.generate_email_address("fr_FR", last_name_override="Bouissières") @@ -488,7 +574,6 @@ class GetEmailParametricTemplate(object): mocked_imaplib_server = mock.Mock() mocked_imaplib_server.search = mock.Mock( return_value=imap_mail_list) - # we ignore the second arg as the data item/mime-part is # constant (RFC822) mocked_imaplib_server.fetch = mock.Mock( @@ -510,7 +595,6 @@ class GetEmailParametricTemplate(object): mocked_imaplib_server = mock.Mock() mocked_imaplib_server.search = mock.Mock( return_value=imap_mail_list) - # we ignore the second arg as the data item/mime-part is # constant (RFC822) mocked_imaplib_server.fetch = mock.Mock( @@ -553,7 +637,6 @@ class GetEmailParametricTemplate(object): except this time the email body contains a Django template tag. For each email source supported, we mock the backend to provide authentically formatted responses containing our test data.""" - # example email text from Django docs: # https://docs.djangoproject.com/en/1.10/ref/unicode/ test_email_from = "Arnbjörg Ráðormsdóttir " @@ -617,7 +700,6 @@ class GetEmailParametricTemplate(object): mocked_imaplib_server = mock.Mock() mocked_imaplib_server.search = mock.Mock( return_value=imap_mail_list) - # we ignore the second arg as the data item/mime-part is # constant (RFC822) mocked_imaplib_server.fetch = mock.Mock( @@ -639,7 +721,6 @@ class GetEmailParametricTemplate(object): mocked_imaplib_server = mock.Mock() mocked_imaplib_server.search = mock.Mock( return_value=imap_mail_list) - # we ignore the second arg as the data item/mime-part is # constant (RFC822) mocked_imaplib_server.fetch = mock.Mock( @@ -680,12 +761,8 @@ class GetEmailParametricTemplate(object): emails from a queue and creating tickets. For each email source supported, we mock the backend to provide authentically formatted responses containing our test data.""" - # example email text from Python docs: # https://docs.python.org/3/library/email-examples.html - from email.mime.multipart import MIMEMultipart - from email.mime.text import MIMEText - me = "my@example.com" you = "your@example.com" # NOTE: CC'd emails need to be alphabetical and tested as such! @@ -695,7 +772,6 @@ class GetEmailParametricTemplate(object): cc_two = "other@example.com" cc = cc_one + ", " + cc_two subject = "Link" - # Create message container - the correct MIME type is # multipart/alternative. msg = MIMEMultipart('alternative') @@ -703,7 +779,6 @@ class GetEmailParametricTemplate(object): msg['From'] = me msg['To'] = you msg['Cc'] = cc - # Create the body of the message (a plain-text and an HTML version). text = "Hi!\nHow are you?\nHere is the link you wanted:\nhttps://www.python.org" html = """\ @@ -717,11 +792,9 @@ class GetEmailParametricTemplate(object): """ - # Record the MIME types of both parts - text/plain and text/html. part1 = MIMEText(text, 'plain') part2 = MIMEText(html, 'html') - # Attach parts into message container. # According to RFC 2046, the last part of a multipart message, in this case # the HTML message, is best and preferred. @@ -785,7 +858,6 @@ class GetEmailParametricTemplate(object): mocked_imaplib_server = mock.Mock() mocked_imaplib_server.search = mock.Mock( return_value=imap_mail_list) - # we ignore the second arg as the data item/mime-part is # constant (RFC822) mocked_imaplib_server.fetch = mock.Mock( @@ -807,7 +879,6 @@ class GetEmailParametricTemplate(object): mocked_imaplib_server = mock.Mock() mocked_imaplib_server.search = mock.Mock( return_value=imap_mail_list) - # we ignore the second arg as the data item/mime-part is # constant (RFC822) mocked_imaplib_server.fetch = mock.Mock( @@ -867,7 +938,6 @@ class GetEmailParametricTemplate(object): def test_read_pgp_signed_email(self): """Tests reading a PGP signed email to ensure we handle base64 and PGP signatures appropriately.""" - # example email text from #567 on GitHub with open(os.path.join(THIS_DIR, "test_files/pgp.eml"), encoding="utf-8") as fd: test_email = fd.read() @@ -923,7 +993,6 @@ class GetEmailParametricTemplate(object): mocked_imaplib_server = mock.Mock() mocked_imaplib_server.search = mock.Mock( return_value=imap_mail_list) - # we ignore the second arg as the data item/mime-part is # constant (RFC822) mocked_imaplib_server.fetch = mock.Mock( @@ -944,7 +1013,6 @@ class GetEmailParametricTemplate(object): mocked_imaplib_server = mock.Mock() mocked_imaplib_server.search = mock.Mock( return_value=imap_mail_list) - # we ignore the second arg as the data item/mime-part is # constant (RFC822) mocked_imaplib_server.fetch = mock.Mock( @@ -1001,8 +1069,6 @@ a9eiiQ+3V1v+7wWHXCzq """) # should this be 'application/pgp-signature'? # self.assertEqual(attach1.mime_type, 'text/plain') - - class GetEmailCCHandling(TestCase): """TestCase that checks CC handling in email. Needs its own test harness.""" @@ -1102,7 +1168,6 @@ class GetEmailCCHandling(TestCase): ", " + test_email_cc_three + ", " + test_email_cc_four + ", " + ticket_user_emails + \ "\nFrom: " + test_email_from + "\nSubject: " + \ test_email_subject + "\n\n" + test_email_body - test_mail_len = len(test_email) with mock.patch('os.listdir') as mocked_listdir, \ mock.patch('helpdesk.email.isfile') as mocked_isfile, \ diff --git a/helpdesk/tests/test_query.py b/helpdesk/tests/test_query.py index 9e7d8842..8e2c1533 100644 --- a/helpdesk/tests/test_query.py +++ b/helpdesk/tests/test_query.py @@ -58,12 +58,13 @@ class QueryTests(TestCase): query = query_to_base64({}) response = self.client.get( reverse('helpdesk:datatables_ticket_list', args=[query])) + resp_json = response.json() self.assertEqual( - response.json(), + resp_json, { "data": - [{"ticket": "1 [test_queue-1]", "id": 1, "priority": 3, "title": "unassigned to kbitem", "queue": {"title": "Test queue", "id": 1}, "status": "Open", "created": "now", "due_date": None, "assigned_to": "None", "submitter": None, "row_class": "", "time_spent": "", "kbitem": ""}, - {"ticket": "2 [test_queue-2]", "id": 2, "priority": 3, "title": "assigned to kbitem", "queue": {"title": "Test queue", "id": 1}, "status": "Open", "created": "now", "due_date": None, "assigned_to": "None", "submitter": None, "row_class": "", "time_spent": "", "kbitem": "KBItem 1"}], + [{"ticket": "1 [test_queue-1]", "id": 1, "priority": 3, "title": "unassigned to kbitem", "queue": {"title": "Test queue", "id": 1}, "status": "Open", "created": resp_json["data"][0]["created"], "due_date": None, "assigned_to": "None", "submitter": None, "row_class": "", "time_spent": "", "kbitem": ""}, + {"ticket": "2 [test_queue-2]", "id": 2, "priority": 3, "title": "assigned to kbitem", "queue": {"title": "Test queue", "id": 1}, "status": "Open", "created": resp_json["data"][1]["created"], "due_date": None, "assigned_to": "None", "submitter": None, "row_class": "", "time_spent": "", "kbitem": "KBItem 1"}], "recordsFiltered": 2, "recordsTotal": 2, "draw": 0, @@ -77,12 +78,13 @@ class QueryTests(TestCase): ) response = self.client.get( reverse('helpdesk:datatables_ticket_list', args=[query])) + resp_json = response.json() self.assertEqual( - response.json(), + resp_json, { "data": [{"ticket": "2 [test_queue-2]", "id": 2, "priority": 3, "title": "assigned to kbitem", "queue": {"title": "Test queue", "id": 1}, "status": "Open", - "created": "now", "due_date": None, "assigned_to": "None", "submitter": None, "row_class": "", "time_spent": "", "kbitem": "KBItem 1"}], + "created": resp_json["data"][0]["created"], "due_date": None, "assigned_to": "None", "submitter": None, "row_class": "", "time_spent": "", "kbitem": "KBItem 1"}], "recordsFiltered": 1, "recordsTotal": 1, "draw": 0, @@ -96,12 +98,13 @@ class QueryTests(TestCase): ) response = self.client.get( reverse('helpdesk:datatables_ticket_list', args=[query])) + resp_json = response.json() self.assertEqual( - response.json(), + resp_json, { "data": [{"ticket": "2 [test_queue-2]", "id": 2, "priority": 3, "title": "assigned to kbitem", "queue": {"title": "Test queue", "id": 1}, "status": "Open", - "created": "now", "due_date": None, "assigned_to": "None", "submitter": None, "row_class": "", "time_spent": "", "kbitem": "KBItem 1"}], + "created": resp_json["data"][0]["created"], "due_date": None, "assigned_to": "None", "submitter": None, "row_class": "", "time_spent": "", "kbitem": "KBItem 1"}], "recordsFiltered": 1, "recordsTotal": 1, "draw": 0, diff --git a/helpdesk/tests/test_ticket_submission.py b/helpdesk/tests/test_ticket_submission.py index 2e1da9ac..06d6fe0d 100644 --- a/helpdesk/tests/test_ticket_submission.py +++ b/helpdesk/tests/test_ticket_submission.py @@ -2,15 +2,12 @@ from django.contrib.auth import get_user_model from django.core import mail -from django.core.exceptions import ObjectDoesNotExist -from django.forms import ValidationError from django.test import TestCase from django.test.client import Client from django.urls import reverse import email -from helpdesk.email import create_ticket_cc, object_from_message +from helpdesk.email import extract_email_metadata from helpdesk.models import CustomField, FollowUp, KBCategory, KBItem, Queue, Ticket, TicketCC -from helpdesk.tests.helpers import print_response import logging from urllib.parse import urlparse import uuid @@ -285,7 +282,7 @@ class EmailInteractionsTestCase(TestCase): email_count = len(mail.outbox) - object_from_message(str(msg), self.queue_public, logger=logger) + extract_email_metadata(str(msg), self.queue_public, logger=logger) followup = FollowUp.objects.get(message_id=message_id) ticket = Ticket.objects.get(id=followup.ticket.id) @@ -317,7 +314,7 @@ class EmailInteractionsTestCase(TestCase): email_count = len(mail.outbox) - object_from_message(str(msg), self.queue_public, logger=logger) + extract_email_metadata(str(msg), self.queue_public, logger=logger) ticket = Ticket.objects.get( title=self.ticket_data['title'], queue=self.queue_public, submitter_email=submitter_email) @@ -353,7 +350,7 @@ class EmailInteractionsTestCase(TestCase): email_count = len(mail.outbox) - object_from_message(str(msg), self.queue_public, logger=logger) + extract_email_metadata(str(msg), self.queue_public, logger=logger) followup = FollowUp.objects.get(message_id=message_id) ticket = Ticket.objects.get(id=followup.ticket.id) @@ -400,7 +397,7 @@ class EmailInteractionsTestCase(TestCase): email_count = len(mail.outbox) - object_from_message(str(msg), self.queue_public, logger=logger) + extract_email_metadata(str(msg), self.queue_public, logger=logger) followup = FollowUp.objects.get(message_id=message_id) ticket = Ticket.objects.get(id=followup.ticket.id) @@ -452,7 +449,7 @@ class EmailInteractionsTestCase(TestCase): email_count = len(mail.outbox) - object_from_message(str(msg), self.queue_public, logger=logger) + extract_email_metadata(str(msg), self.queue_public, logger=logger) followup = FollowUp.objects.get(message_id=message_id) ticket = Ticket.objects.get(id=followup.ticket.id) @@ -498,7 +495,7 @@ class EmailInteractionsTestCase(TestCase): email_count = len(mail.outbox) - object_from_message(str(msg), self.queue_public, logger=logger) + extract_email_metadata(str(msg), self.queue_public, logger=logger) followup = FollowUp.objects.get(message_id=message_id) ticket = Ticket.objects.get(id=followup.ticket.id) @@ -532,7 +529,7 @@ class EmailInteractionsTestCase(TestCase): reply.__setitem__('Content-Type', 'text/plain;') reply.set_payload(self.ticket_data['description']) - object_from_message(str(reply), self.queue_public, logger=logger) + extract_email_metadata(str(reply), self.queue_public, logger=logger) followup = FollowUp.objects.get(message_id=message_id) ticket = Ticket.objects.get(id=followup.ticket.id) @@ -588,7 +585,7 @@ class EmailInteractionsTestCase(TestCase): email_count = len(mail.outbox) - object_from_message(str(msg), self.queue_public, logger=logger) + extract_email_metadata(str(msg), self.queue_public, logger=logger) followup = FollowUp.objects.get(message_id=message_id) ticket = Ticket.objects.get(id=followup.ticket.id) @@ -632,7 +629,7 @@ class EmailInteractionsTestCase(TestCase): reply.__setitem__('Content-Type', 'text/plain;') reply.set_payload(self.ticket_data['description']) - object_from_message(str(reply), self.queue_public, logger=logger) + extract_email_metadata(str(reply), self.queue_public, logger=logger) followup = FollowUp.objects.get(message_id=message_id) ticket = Ticket.objects.get(id=followup.ticket.id) @@ -686,7 +683,7 @@ class EmailInteractionsTestCase(TestCase): email_count = len(mail.outbox) - object_from_message(str(msg), self.queue_public, logger=logger) + extract_email_metadata(str(msg), self.queue_public, logger=logger) followup = FollowUp.objects.get(message_id=message_id) ticket = Ticket.objects.get(id=followup.ticket.id) @@ -741,7 +738,7 @@ class EmailInteractionsTestCase(TestCase): email_count = len(mail.outbox) - object_from_message(str(reply), self.queue_public, logger=logger) + extract_email_metadata(str(reply), self.queue_public, logger=logger) followup = FollowUp.objects.get(message_id=message_id) ticket = Ticket.objects.get(id=followup.ticket.id) @@ -783,7 +780,7 @@ class EmailInteractionsTestCase(TestCase): msg.set_payload(self.ticket_data['description']) email_count = len(mail.outbox) - object_from_message(str(msg), self.queue_public, logger=logger) + extract_email_metadata(str(msg), self.queue_public, logger=logger) followup = FollowUp.objects.get(message_id=message_id) ticket = Ticket.objects.get(id=followup.ticket.id) @@ -833,7 +830,7 @@ class EmailInteractionsTestCase(TestCase): email_count = len(mail.outbox) - object_from_message( + extract_email_metadata( str(msg), self.queue_public_with_notifications_disabled, logger=logger) followup = FollowUp.objects.get(message_id=message_id) @@ -880,7 +877,7 @@ class EmailInteractionsTestCase(TestCase): email_count = len(mail.outbox) - object_from_message(str(msg), self.queue_public, logger=logger) + extract_email_metadata(str(msg), self.queue_public, logger=logger) followup = FollowUp.objects.get(message_id=message_id) ticket = Ticket.objects.get(id=followup.ticket.id) @@ -918,7 +915,7 @@ class EmailInteractionsTestCase(TestCase): reply.__setitem__('Content-Type', 'text/plain;') reply.set_payload(self.ticket_data['description']) - object_from_message(str(reply), self.queue_public, logger=logger) + extract_email_metadata(str(reply), self.queue_public, logger=logger) followup = FollowUp.objects.get(message_id=message_id) ticket = Ticket.objects.get(id=followup.ticket.id) @@ -964,7 +961,7 @@ class EmailInteractionsTestCase(TestCase): email_count = len(mail.outbox) - object_from_message( + extract_email_metadata( str(msg), self.queue_public_with_notifications_disabled, logger=logger) followup = FollowUp.objects.get(message_id=message_id) @@ -1003,7 +1000,7 @@ class EmailInteractionsTestCase(TestCase): reply.__setitem__('Content-Type', 'text/plain;') reply.set_payload(self.ticket_data['description']) - object_from_message( + extract_email_metadata( str(reply), self.queue_public_with_notifications_disabled, logger=logger) followup = FollowUp.objects.get(message_id=message_id) @@ -1037,7 +1034,7 @@ class EmailInteractionsTestCase(TestCase): email_count = len(mail.outbox) - object_from_message(str(msg), self.queue_public, logger=logger) + extract_email_metadata(str(msg), self.queue_public, logger=logger) followup = FollowUp.objects.get(message_id=message_id) ticket = Ticket.objects.get(id=followup.ticket.id) @@ -1059,7 +1056,7 @@ class EmailInteractionsTestCase(TestCase): reply.__setitem__('Content-Type', 'text/plain;') reply.set_payload(self.ticket_data['description']) - object_from_message(str(reply), self.queue_public, logger=logger) + extract_email_metadata(str(reply), self.queue_public, logger=logger) followup = FollowUp.objects.get(message_id=message_id) ticket = Ticket.objects.get(id=followup.ticket.id) @@ -1088,23 +1085,48 @@ class EmailInteractionsTestCase(TestCase): cat = KBCategory.objects.create( title="Test Cat", slug="test_cat", - description="This is a test category", + description="This is a test category", queue=self.queue_public, ) cat.save() + attr_list = { + "f1_field_title": "KBItem 1", + "f1_attr": "kbitem", + "f1_attr_value": "1", + "f2_attr": "submitter_email", + "f2_attr_value": "foo@bar.cz", + "f3_attr": "title", + "f3_attr_value": "lol", + } self.kbitem1 = KBItem.objects.create( category=cat, - title="KBItem 1", + title=attr_list["f1_field_title"], question="What?", answer="A KB Item", ) self.kbitem1.save() - cat_url = reverse('helpdesk:submit') + \ - "?kbitem=1&submitter_email=foo@bar.cz&title=lol" + cat_url = reverse('helpdesk:submit') + '?' \ + + attr_list["f1_attr"] + '=' + attr_list["f1_attr_value"] + '&' \ + + attr_list["f2_attr"] + '=' + attr_list["f2_attr_value"] + '&' \ + + attr_list["f3_attr"] + '=' + attr_list["f3_attr_value"] response = self.client.get(cat_url) + # Get the rendered response to make it easier to debug if things go wrong + if ( + hasattr(response, "render") + and callable(response.render) + and not response.is_rendered + ): + response.render() + if response.streaming: + content = b"".join(response.streaming_content) + else: + content = response.content + + + msg_prefix = content.decode(response.charset) self.assertContains( - response, '') + response, '', msg_prefix = msg_prefix) self.assertContains( - response, '') + response, '') + response, ' Message: +def generate_image_mime_part(locale: str="en_US",imagename: str = None, disposition_primary_type: str = "attachment") -> Message: """ :param locale: change this to generate locale specific file name and attachment content :param filename: pass a file name if you want to specify a specific name otherwise a random name will be generated """ part = MIMEImage(generate_random_image(image_format="JPEG", array_dims=(200, 200))) - part.set_payload(get_fake("text", locale=locale, min_length=1024)) + #part.set_payload(get_fake("text", locale=locale, min_length=1024)) encoders.encode_base64(part) if not imagename: imagename = get_fake("word", locale=locale, min_length=8) + ".jpg" - part.add_header('Content-Disposition', "attachment; filename= %s" % imagename) + part.add_header('Content-Disposition', disposition_primary_type + "; filename= %s" % imagename) return part def generate_email_list(address_cnt: int = 3, @@ -215,10 +215,10 @@ def generate_mime_part(locale: str="en_US", """ if "plain" == part_type: body = get_fake("text", locale=locale, min_length=1024) - msg = MIMEText(body) + msg = MIMEText(body, part_type) elif "html" == part_type: body = get_fake_html(locale=locale, wrap_in_body_tag=True) - msg = MIMEText(body) + msg = MIMEText(body, part_type) elif "file" == part_type: msg = generate_file_mime_part(locale=locale) elif "image" == part_type: @@ -229,6 +229,7 @@ def generate_mime_part(locale: str="en_US", def generate_multipart_email(locale: str="en_US", type_list: typing.List[str]=["plain", "html", "image"], + sub_type: str = None, use_short_email: bool=False ) -> typing.Tuple[Message, typing.Tuple[str, str], typing.Tuple[str, str]]: """ @@ -236,9 +237,10 @@ def generate_multipart_email(locale: str="en_US", :param locale: :param type_list: options are plain, html, image (attachment), file (attachment) + :param sub_type: multipart sub type that defaults to "mixed" if not specified :param use_short_email: produces a "To" or "From" that is only the email address if True """ - msg = MIMEMultipart() + msg = MIMEMultipart(sub_type) if sub_type else MIMEMultipart() for part_type in type_list: msg.attach(generate_mime_part(locale=locale, part_type=part_type)) from_meta, to_meta = add_simple_email_headers(msg, locale=locale, use_short_email=use_short_email) diff --git a/requirements-no-pinax.txt b/requirements-no-pinax.txt index eeebb7c5..45868a78 100644 --- a/requirements-no-pinax.txt +++ b/requirements-no-pinax.txt @@ -7,7 +7,6 @@ akismet markdown beautifulsoup4 lxml -simplejson pytz six djangorestframework diff --git a/requirements.txt b/requirements.txt index ecf2f1ff..453bef8f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,6 @@ akismet markdown beautifulsoup4 lxml -simplejson pytz pinax_teams djangorestframework