mirror of
https://github.com/django-helpdesk/django-helpdesk.git
synced 2024-12-13 02:10:49 +01:00
Merge pull request #1053 from uhurusurfa/fix_attachment_file_name_handling
Attachment file name handling, fixes email containing comma in "real name" and ignored emails
This commit is contained in:
commit
0d810d6102
1
.gitignore
vendored
1
.gitignore
vendored
@ -12,6 +12,7 @@ docs/doctrees/*
|
||||
.directory
|
||||
*.swp
|
||||
.idea
|
||||
.tox/**
|
||||
|
||||
# ignore demo attachments that user might have added
|
||||
helpdesk/attachments/
|
||||
|
26
.travis.yml
26
.travis.yml
@ -1,26 +0,0 @@
|
||||
language: python
|
||||
dist: focal # use LTS 20.04
|
||||
|
||||
python:
|
||||
- "3.6"
|
||||
- "3.7"
|
||||
- "3.8"
|
||||
|
||||
env:
|
||||
- DJANGO=2.2.23
|
||||
- DJANGO=3.1.11
|
||||
- DJANGO=3.2.3
|
||||
|
||||
install:
|
||||
- pip install -q Django==$DJANGO
|
||||
- pip install -q -r requirements.txt
|
||||
- pip install -q -r requirements-testing.txt
|
||||
|
||||
before_script:
|
||||
- "pycodestyle --exclude=migrations --ignore=E501,W503,W504 helpdesk"
|
||||
|
||||
script:
|
||||
- coverage run --source='.' quicktest.py helpdesk
|
||||
|
||||
after_success:
|
||||
- codecov
|
16
Makefile
16
Makefile
@ -56,6 +56,22 @@ test:
|
||||
mkdir -p var
|
||||
$(PIP) install -e .[test]
|
||||
$(TOX)
|
||||
rm -rf var
|
||||
|
||||
|
||||
|
||||
#: format - Run the PEP8 formatter.
|
||||
.PHONY: format
|
||||
format:
|
||||
autopep8 --exit-code --global-config .flake8 helpdesk
|
||||
isort --line-length=120 --src helpdesk .
|
||||
|
||||
|
||||
#: checkformat - checks formatting against configured format specifications for the project.
|
||||
.PHONY: checkformat
|
||||
checkformat:
|
||||
flake8 helpdesk --count --show-source --statistics --exit-zero --max-complexity=20
|
||||
isort --line-length=120 --src helpdesk . --check
|
||||
|
||||
|
||||
#: documentation - Build documentation (Sphinx, README, ...).
|
||||
|
21
README.rst
21
README.rst
@ -69,9 +69,30 @@ Django project.
|
||||
For further installation information see `docs/install.html`
|
||||
and `docs/configuration.html`
|
||||
|
||||
Developer Environment
|
||||
---------------------
|
||||
|
||||
Follow these steps to set up your development environment to contribute to helpdesk:
|
||||
- install a virtual environment
|
||||
- using virtualenv from the helpdesk base folder do::
|
||||
virtualenv .venv && source .venv/bin/activate
|
||||
|
||||
- install the requirements for development::
|
||||
pip install -r requirements.txt -r requirements-dev.txt
|
||||
|
||||
To see option for the Makefile run: `make`
|
||||
|
||||
The project enforces a standardized formatting in the CI/CD pipeline. To ensure you have the correct formatting run::
|
||||
make checkformat
|
||||
|
||||
To auto format any code use this::
|
||||
make format
|
||||
|
||||
Testing
|
||||
-------
|
||||
|
||||
From the command line you can run the tests using: `make test`
|
||||
|
||||
See `quicktest.py` for usage details.
|
||||
|
||||
Upgrading from previous versions
|
||||
|
@ -7,6 +7,7 @@ See LICENSE for details.
|
||||
|
||||
# import base64
|
||||
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
from datetime import timedelta
|
||||
from django.conf import settings as django_settings
|
||||
@ -17,9 +18,11 @@ from django.db.models import Q
|
||||
from django.utils import encoding, timezone
|
||||
from django.utils.translation import gettext as _
|
||||
import email
|
||||
from email.message import Message
|
||||
from email.utils import getaddresses
|
||||
from email_reply_parser import EmailReplyParser
|
||||
from helpdesk import settings
|
||||
from helpdesk.exceptions import DeleteIgnoredTicketException, IgnoreTicketException
|
||||
from helpdesk.lib import process_attachments, safe_template_context
|
||||
from helpdesk.models import FollowUp, IgnoreEmail, Queue, Ticket
|
||||
import imaplib
|
||||
@ -34,6 +37,7 @@ import ssl
|
||||
import sys
|
||||
from time import ctime
|
||||
import typing
|
||||
from typing import List, Tuple
|
||||
|
||||
|
||||
# import User model, which may be a custom model
|
||||
@ -135,16 +139,23 @@ def pop3_sync(q, logger, server):
|
||||
else:
|
||||
full_message = encoding.force_str(
|
||||
"\n".join(raw_content), errors='replace')
|
||||
ticket = object_from_message(
|
||||
message=full_message, queue=q, logger=logger)
|
||||
|
||||
if ticket:
|
||||
server.dele(msgNum)
|
||||
logger.info(
|
||||
"Successfully processed message %s, deleted from POP3 server" % msgNum)
|
||||
else:
|
||||
try:
|
||||
ticket = object_from_message(message=full_message, queue=q, logger=logger)
|
||||
except IgnoreTicketException:
|
||||
logger.warn(
|
||||
"Message %s was not successfully processed, and will be left on POP3 server" % msgNum)
|
||||
"Message %s was ignored and will be left on POP3 server" % msgNum)
|
||||
except DeleteIgnoredTicketException:
|
||||
logger.warn(
|
||||
"Message %s was ignored and deleted from POP3 server" % msgNum)
|
||||
server.dele(msgNum)
|
||||
else:
|
||||
if ticket:
|
||||
server.dele(msgNum)
|
||||
logger.info(
|
||||
"Successfully processed message %s, deleted from POP3 server" % msgNum)
|
||||
else:
|
||||
logger.warn(
|
||||
"Message %s was not successfully processed, and will be left on POP3 server" % msgNum)
|
||||
|
||||
server.quit()
|
||||
|
||||
@ -186,17 +197,23 @@ def imap_sync(q, logger, server):
|
||||
data = server.fetch(num, '(RFC822)')[1]
|
||||
full_message = encoding.force_str(data[0][1], errors='replace')
|
||||
try:
|
||||
ticket = object_from_message(
|
||||
message=full_message, queue=q, logger=logger)
|
||||
except TypeError:
|
||||
ticket = None # hotfix. Need to work out WHY.
|
||||
if ticket:
|
||||
ticket = object_from_message(message=full_message, queue=q, logger=logger)
|
||||
except IgnoreTicketException:
|
||||
logger.warn("Message %s was ignored and will be left on IMAP server" % num)
|
||||
except DeleteIgnoredTicketException:
|
||||
server.store(num, '+FLAGS', '\\Deleted')
|
||||
logger.info(
|
||||
"Successfully processed message %s, deleted from IMAP server" % num)
|
||||
logger.warn("Message %s was ignored and deleted from IMAP server" % num)
|
||||
except TypeError as te:
|
||||
# Log the error with stacktrace to help identify what went wrong
|
||||
logger.error(f"Unexpected error processing message: {te}", exc_info=True)
|
||||
else:
|
||||
logger.warn(
|
||||
"Message %s was not successfully processed, and will be left on IMAP server" % num)
|
||||
if ticket:
|
||||
server.store(num, '+FLAGS', '\\Deleted')
|
||||
logger.info(
|
||||
"Successfully processed message %s, deleted from IMAP server" % num)
|
||||
else:
|
||||
logger.warn(
|
||||
"Message %s was not successfully processed, and will be left on IMAP server" % num)
|
||||
except imaplib.IMAP4.error:
|
||||
logger.error(
|
||||
"IMAP retrieve failed. Is the folder '%s' spelled correctly, and does it exist on the server?",
|
||||
@ -282,22 +299,28 @@ def process_queue(q, logger):
|
||||
logger.info("Processing message %d" % i)
|
||||
with open(m, 'r') as f:
|
||||
full_message = encoding.force_str(f.read(), errors='replace')
|
||||
ticket = object_from_message(
|
||||
message=full_message, queue=q, logger=logger)
|
||||
if ticket:
|
||||
logger.info(
|
||||
"Successfully processed message %d, ticket/comment created.", i)
|
||||
try:
|
||||
# delete message file if ticket was successful
|
||||
ticket = object_from_message(message=full_message, queue=q, logger=logger)
|
||||
except IgnoreTicketException:
|
||||
logger.warn("Message %d was ignored and will be left in local directory", i)
|
||||
except DeleteIgnoredTicketException:
|
||||
os.unlink(m)
|
||||
except OSError as e:
|
||||
logger.error(
|
||||
"Unable to delete message %d (%s).", i, str(e))
|
||||
logger.warn("Message %d was ignored and deleted local directory", i)
|
||||
else:
|
||||
logger.info("Successfully deleted message %d.", i)
|
||||
else:
|
||||
logger.warn(
|
||||
"Message %d was not successfully processed, and will be left in local directory", i)
|
||||
if ticket:
|
||||
logger.info(
|
||||
"Successfully processed message %d, ticket/comment created.", i)
|
||||
try:
|
||||
# delete message file if ticket was successful
|
||||
os.unlink(m)
|
||||
except OSError as e:
|
||||
logger.error(
|
||||
"Unable to delete message %d (%s).", i, str(e))
|
||||
else:
|
||||
logger.info("Successfully deleted message %d.", i)
|
||||
else:
|
||||
logger.warn(
|
||||
"Message %d was not successfully processed, and will be left in local directory", i)
|
||||
|
||||
|
||||
def decodeUnknown(charset, string):
|
||||
@ -574,37 +597,124 @@ def get_email_body_from_part_payload(part) -> str:
|
||||
)
|
||||
|
||||
|
||||
def attempt_body_extract_from_html(message: str) -> str:
|
||||
mail = BeautifulSoup(str(message), "html.parser")
|
||||
beautiful_body = mail.find('body')
|
||||
body = None
|
||||
full_body = None
|
||||
if beautiful_body:
|
||||
try:
|
||||
body = beautiful_body.text
|
||||
full_body = body
|
||||
except AttributeError:
|
||||
pass
|
||||
if not body:
|
||||
body = ""
|
||||
return body, full_body
|
||||
|
||||
|
||||
def extract_part_data(
|
||||
part: Message,
|
||||
counter: int,
|
||||
ticket_id: int,
|
||||
files: List,
|
||||
logger: logging.Logger
|
||||
) -> Tuple[str, str]:
|
||||
name = part.get_filename()
|
||||
if name:
|
||||
name = email.utils.collapse_rfc2231_value(name)
|
||||
part_body = None
|
||||
part_full_body = None
|
||||
if part.get_content_maintype() == 'text' and name is None:
|
||||
if part.get_content_subtype() == 'plain':
|
||||
part_body = part.get_payload(decode=True)
|
||||
# https://github.com/django-helpdesk/django-helpdesk/issues/732
|
||||
if part['Content-Transfer-Encoding'] == '8bit' and part.get_content_charset() == 'utf-8':
|
||||
part_body = part_body.decode('unicode_escape')
|
||||
part_body = decodeUnknown(part.get_content_charset(), part_body)
|
||||
# have to use django_settings here so overwriting it works in tests
|
||||
# the default value is False anyway
|
||||
if ticket_id is None and getattr(django_settings, 'HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL', False):
|
||||
# first message in thread, we save full body to avoid
|
||||
# losing forwards and things like that
|
||||
part_full_body = get_body_from_fragments(part_body)
|
||||
part_body = EmailReplyParser.parse_reply(part_body)
|
||||
else:
|
||||
# second and other reply, save only first part of the
|
||||
# message
|
||||
part_body = EmailReplyParser.parse_reply(part_body)
|
||||
part_full_body = part_body
|
||||
# workaround to get unicode text out rather than escaped text
|
||||
part_body = get_encoded_body(part_body)
|
||||
logger.debug("Discovered plain text MIME part")
|
||||
else:
|
||||
email_body = get_email_body_from_part_payload(part)
|
||||
|
||||
if not part_body and not part_full_body:
|
||||
# no text has been parsed so far - try such deep parsing
|
||||
# for some messages
|
||||
altered_body = email_body.replace(
|
||||
"</p>", "</p>\n").replace("<br", "\n<br")
|
||||
mail = BeautifulSoup(str(altered_body), "html.parser")
|
||||
part_full_body = mail.get_text()
|
||||
|
||||
if "<body" not in email_body:
|
||||
email_body = f"<body>{email_body}</body>"
|
||||
|
||||
payload = (
|
||||
'<html>'
|
||||
'<head>'
|
||||
'<meta charset="utf-8" />'
|
||||
'</head>'
|
||||
'%s'
|
||||
'</html>'
|
||||
) % email_body
|
||||
files.append(
|
||||
SimpleUploadedFile(
|
||||
_("email_html_body.html"), payload.encode("utf-8"), 'text/html')
|
||||
)
|
||||
logger.debug("Discovered HTML MIME part")
|
||||
else:
|
||||
if not name:
|
||||
ext = mimetypes.guess_extension(part.get_content_type())
|
||||
name = f"part-{counter}{ext}"
|
||||
else:
|
||||
name = f"part-{counter}_{name}"
|
||||
|
||||
files.append(SimpleUploadedFile(name, part.get_payload(decode=True), mimetypes.guess_type(name)[0]))
|
||||
logger.debug("Found MIME attachment %s", name)
|
||||
return part_body, part_full_body
|
||||
|
||||
|
||||
def object_from_message(message: str,
|
||||
queue: Queue,
|
||||
logger: logging.Logger
|
||||
) -> Ticket:
|
||||
# 'message' must be an RFC822 formatted message.
|
||||
message = email.message_from_string(message)
|
||||
# 'message' must be an RFC822 formatted message to correctly parse.
|
||||
message_obj = email.message_from_string(message)
|
||||
|
||||
subject = message.get('subject', _('Comment from e-mail'))
|
||||
subject = message_obj.get('subject', _('Comment from e-mail'))
|
||||
subject = decode_mail_headers(
|
||||
decodeUnknown(message.get_charset(), subject))
|
||||
decodeUnknown(message_obj.get_charset(), subject))
|
||||
for affix in STRIPPED_SUBJECT_STRINGS:
|
||||
subject = subject.replace(affix, "")
|
||||
subject = subject.strip()
|
||||
|
||||
sender = message.get('from', _('Unknown Sender'))
|
||||
sender = decode_mail_headers(decodeUnknown(message.get_charset(), sender))
|
||||
|
||||
# to address bug #832, we wrap all the text in front of the email address in
|
||||
# double quotes by using replace() on the email string. Then,
|
||||
# take first item of list, second item of tuple is the actual email address.
|
||||
# Note that the replace won't work on just an email with no real name,
|
||||
# but the getaddresses() function seems to be able to handle just unclosed quotes
|
||||
# correctly. Not ideal, but this seems to work for now.
|
||||
sender_email = email.utils.getaddresses(
|
||||
['\"' + sender.replace('<', '\" <')])[0][1]
|
||||
# TODO: Should really be assigning a properly formatted fake email.
|
||||
# Check if anything relies on this being a "real name" formatted string if no sender is found on message_obj.
|
||||
# Also not sure it should be accepting emails from unknown senders
|
||||
sender_email = _('Unknown Sender')
|
||||
sender_hdr = message_obj.get('from')
|
||||
if sender_hdr:
|
||||
# Parse the header which extracts the first email address in the list if more than one
|
||||
# The parseaddr method returns a tuple in the form <real name> <email address>
|
||||
# Only need the actual email address from the tuple not the "real name"
|
||||
# Since the spec requires that all email addresses are ASCII, they will not be encoded
|
||||
sender_email = email.utils.parseaddr(sender_hdr)[1]
|
||||
|
||||
for ignore in IgnoreEmail.objects.filter(Q(queues=queue) | Q(queues__isnull=True)):
|
||||
if ignore.test(sender_email):
|
||||
# By returning 'False' the message will be kept in the mailbox,
|
||||
# and the 'True' will cause the message to be deleted.
|
||||
return not ignore.keep_in_mailbox
|
||||
raise IgnoreTicketException() if ignore.keep_in_mailbox else DeleteIgnoredTicketException()
|
||||
|
||||
ticket_id: typing.Optional[int] = get_ticket_id_from_subject_slug(
|
||||
queue.slug,
|
||||
@ -617,108 +727,23 @@ def object_from_message(message: str,
|
||||
counter = 0
|
||||
files = []
|
||||
|
||||
for part in message.walk():
|
||||
for part in message_obj.walk():
|
||||
if part.get_content_maintype() == 'multipart':
|
||||
continue
|
||||
|
||||
name = part.get_param("name")
|
||||
if name:
|
||||
name = email.utils.collapse_rfc2231_value(name)
|
||||
|
||||
if part.get_content_maintype() == 'text' and name is None:
|
||||
if part.get_content_subtype() == 'plain':
|
||||
body = part.get_payload(decode=True)
|
||||
# https://github.com/django-helpdesk/django-helpdesk/issues/732
|
||||
if part['Content-Transfer-Encoding'] == '8bit' and part.get_content_charset() == 'utf-8':
|
||||
body = body.decode('unicode_escape')
|
||||
body = decodeUnknown(part.get_content_charset(), body)
|
||||
# have to use django_settings here so overwritting it works in tests
|
||||
# the default value is False anyway
|
||||
if ticket_id is None and getattr(django_settings, 'HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL', False):
|
||||
# first message in thread, we save full body to avoid
|
||||
# losing forwards and things like that
|
||||
full_body = get_body_from_fragments(body)
|
||||
body = EmailReplyParser.parse_reply(body)
|
||||
else:
|
||||
# second and other reply, save only first part of the
|
||||
# message
|
||||
body = EmailReplyParser.parse_reply(body)
|
||||
full_body = body
|
||||
# workaround to get unicode text out rather than escaped text
|
||||
body = get_encoded_body(body)
|
||||
logger.debug("Discovered plain text MIME part")
|
||||
else:
|
||||
email_body = get_email_body_from_part_payload(part)
|
||||
|
||||
if not body and not full_body:
|
||||
# no text has been parsed so far - try such deep parsing
|
||||
# for some messages
|
||||
altered_body = email_body.replace(
|
||||
"</p>", "</p>\n").replace("<br", "\n<br")
|
||||
mail = BeautifulSoup(str(altered_body), "html.parser")
|
||||
full_body = mail.get_text()
|
||||
|
||||
if "<body" not in email_body:
|
||||
email_body = f"<body>{email_body}</body>"
|
||||
|
||||
payload = (
|
||||
'<html>'
|
||||
'<head>'
|
||||
'<meta charset="utf-8" />'
|
||||
'</head>'
|
||||
'%s'
|
||||
'</html>'
|
||||
) % email_body
|
||||
files.append(
|
||||
SimpleUploadedFile(
|
||||
_("email_html_body.html"), payload.encode("utf-8"), 'text/html')
|
||||
)
|
||||
logger.debug("Discovered HTML MIME part")
|
||||
else:
|
||||
if not name:
|
||||
ext = mimetypes.guess_extension(part.get_content_type())
|
||||
name = "part-%i%s" % (counter, ext)
|
||||
else:
|
||||
name = ("part-%i_" % counter) + name
|
||||
|
||||
# # FIXME: this code gets the paylods, then does something with it and then completely ignores it
|
||||
# # writing the part.get_payload(decode=True) instead; and then the payload variable is
|
||||
# # replaced by some dict later.
|
||||
# # the `payloadToWrite` has been also ignored so was commented
|
||||
# payload = part.get_payload()
|
||||
# if isinstance(payload, list):
|
||||
# payload = payload.pop().as_string()
|
||||
# # payloadToWrite = payload
|
||||
# # check version of python to ensure use of only the correct error type
|
||||
# non_b64_err = TypeError
|
||||
# try:
|
||||
# logger.debug("Try to base64 decode the attachment payload")
|
||||
# # payloadToWrite = base64.decodebytes(payload)
|
||||
# except non_b64_err:
|
||||
# logger.debug("Payload was not base64 encoded, using raw bytes")
|
||||
# # payloadToWrite = payload
|
||||
files.append(SimpleUploadedFile(name, part.get_payload(
|
||||
decode=True), mimetypes.guess_type(name)[0]))
|
||||
logger.debug("Found MIME attachment %s" % name)
|
||||
|
||||
# See email.message_obj.Message.get_filename()
|
||||
part_body, part_full_body = extract_part_data(part, counter, ticket_id, files, logger)
|
||||
if part_body:
|
||||
body = part_body
|
||||
full_body = part_full_body
|
||||
counter += 1
|
||||
|
||||
if not body:
|
||||
mail = BeautifulSoup(str(message), "html.parser")
|
||||
beautiful_body = mail.find('body')
|
||||
if beautiful_body:
|
||||
try:
|
||||
body = beautiful_body.text
|
||||
full_body = body
|
||||
except AttributeError:
|
||||
pass
|
||||
if not body:
|
||||
body = ""
|
||||
body, full_body = attempt_body_extract_from_html(message_obj)
|
||||
|
||||
add_file_if_always_save_incoming_email_message(files, message)
|
||||
add_file_if_always_save_incoming_email_message(files, message_obj)
|
||||
|
||||
smtp_priority = message.get('priority', '')
|
||||
smtp_importance = message.get('importance', '')
|
||||
smtp_priority = message_obj.get('priority', '')
|
||||
smtp_importance = message_obj.get('importance', '')
|
||||
high_priority_types = {'high', 'important', '1', 'urgent'}
|
||||
priority = 2 if high_priority_types & {
|
||||
smtp_priority, smtp_importance} else 3
|
||||
@ -733,4 +758,4 @@ def object_from_message(message: str,
|
||||
'files': files,
|
||||
}
|
||||
|
||||
return create_object_from_email_message(message, ticket_id, payload, files, logger=logger)
|
||||
return create_object_from_email_message(message_obj, ticket_id, payload, files, logger=logger)
|
||||
|
13
helpdesk/exceptions.py
Normal file
13
helpdesk/exceptions.py
Normal file
@ -0,0 +1,13 @@
|
||||
class IgnoreTicketException(Exception):
|
||||
"""
|
||||
Raised when an email message is received from a sender who is marked to be ignored
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class DeleteIgnoredTicketException(Exception):
|
||||
"""
|
||||
Raised when an email message is received from a sender who is marked to be ignored
|
||||
and the record is tagged to delete the email from the inbox
|
||||
"""
|
||||
pass
|
@ -1,13 +1,18 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
from django.contrib.auth.hashers import make_password
|
||||
from django.contrib.auth.models import User
|
||||
from django.core.files.uploadedfile import SimpleUploadedFile
|
||||
from django.core.management import call_command
|
||||
from django.shortcuts import get_object_or_404
|
||||
from django.test import override_settings, TestCase
|
||||
import helpdesk.email
|
||||
from helpdesk.email import extract_part_data, object_from_message
|
||||
from helpdesk.exceptions import DeleteIgnoredTicketException, IgnoreTicketException
|
||||
from helpdesk.management.commands.get_email import Command
|
||||
from helpdesk.models import FollowUp, FollowUpAttachment, Queue, Ticket, TicketCC
|
||||
from helpdesk.models import FollowUp, FollowUpAttachment, IgnoreEmail, Queue, Ticket, TicketCC
|
||||
from helpdesk.tests import utils
|
||||
import itertools
|
||||
import logging
|
||||
import os
|
||||
@ -15,6 +20,7 @@ from shutil import rmtree
|
||||
import six
|
||||
import sys
|
||||
from tempfile import mkdtemp
|
||||
import typing
|
||||
from unittest import mock
|
||||
|
||||
|
||||
@ -42,7 +48,7 @@ class GetEmailCommonTests(TestCase):
|
||||
with mock.patch.object(Command, 'handle', return_value=None) as mocked_handle:
|
||||
call_command('get_email', quiet=quiet_test_value)
|
||||
mocked_handle.assert_called_once()
|
||||
for args, kwargs in mocked_handle.call_args_list:
|
||||
for _, kwargs in mocked_handle.call_args_list:
|
||||
self.assertEqual(quiet_test_value, (kwargs['quiet']))
|
||||
|
||||
def test_email_with_blank_body_and_attachment(self):
|
||||
@ -132,6 +138,40 @@ class GetEmailCommonTests(TestCase):
|
||||
assert "Hello there!" in FollowUp.objects.filter(
|
||||
ticket=ticket).first().comment
|
||||
|
||||
def test_will_delete_ignored_email(self):
|
||||
"""
|
||||
Tests if an email will be ignored if configured to do so and throws the correct exception
|
||||
to ensure the email is deleted
|
||||
"""
|
||||
message, from_meta, _ = utils.generate_text_email(locale="es_ES")
|
||||
ignore = IgnoreEmail(name="Test Ignore", email_address=from_meta[1], keep_in_mailbox=False)
|
||||
ignore.save()
|
||||
with self.assertRaises(DeleteIgnoredTicketException):
|
||||
object_from_message(message.as_string(), self.queue_public, self.logger);
|
||||
|
||||
def test_will_not_delete_ignored_email(self):
|
||||
"""
|
||||
Tests if an email will be ignored if configured to do so and throws the correct exception
|
||||
to ensure the email is NOT deleted
|
||||
"""
|
||||
message, from_meta, _ = utils.generate_text_email(locale="es_ES")
|
||||
ignore = IgnoreEmail(name="Test Ignore", email_address=from_meta[1], keep_in_mailbox=True)
|
||||
ignore.save()
|
||||
with self.assertRaises(IgnoreTicketException):
|
||||
object_from_message(message.as_string(), self.queue_public, self.logger);
|
||||
|
||||
def test_utf8_filename_attachment(self):
|
||||
"""
|
||||
Tests if an attachment correctly sent with a UTF8 filename in disposition is extracted correctly
|
||||
"""
|
||||
filename = "TeléfonoMañana.txt"
|
||||
part = utils.generate_file_mime_part(locale="es_ES", filename=filename)
|
||||
files: typing.List[SimpleUploadedFile] = []
|
||||
extract_part_data(part, counter=1, ticket_id="tst1", files=files, logger=self.logger)
|
||||
sent_file: SimpleUploadedFile = files[0]
|
||||
# The extractor prepends a part identifier so compare the ending
|
||||
self.assertTrue(sent_file.name.endswith(filename), f"Filename extracted does not match: {sent_file.name}")
|
||||
|
||||
|
||||
class GetEmailParametricTemplate(object):
|
||||
"""TestCase that checks basic email functionality across methods and socks configs."""
|
||||
@ -258,12 +298,12 @@ class GetEmailParametricTemplate(object):
|
||||
"""Tests correctly decoding mail headers when a comma is encoded into
|
||||
UTF-8. See bug report #832."""
|
||||
|
||||
# example email text from Django docs:
|
||||
# https://docs.djangoproject.com/en/1.10/ref/unicode/
|
||||
test_email_from = "Bernard-Bouissières, Benjamin <bbb@example.com>"
|
||||
# Create the from using standard RFC required formats
|
||||
# Override the last_name to ensure we get a non-ascii character in it
|
||||
test_email_from_meta = utils.generate_email_address("fr_FR", last_name_override="Bouissières")
|
||||
test_email_subject = "Commas in From lines"
|
||||
test_email_body = "Testing commas in from email UTF-8."
|
||||
test_email = "To: helpdesk@example.com\nFrom: " + test_email_from + \
|
||||
test_email = "To: helpdesk@example.com\nFrom: " + test_email_from_meta[0] + \
|
||||
"\nSubject: " + test_email_subject + "\n\n" + test_email_body
|
||||
test_mail_len = len(test_email)
|
||||
|
||||
@ -333,13 +373,13 @@ class GetEmailParametricTemplate(object):
|
||||
|
||||
ticket1 = get_object_or_404(Ticket, pk=1)
|
||||
self.assertEqual(ticket1.ticket_for_url, "QQ-%s" % ticket1.id)
|
||||
self.assertEqual(ticket1.submitter_email, 'bbb@example.com')
|
||||
self.assertEqual(ticket1.submitter_email, test_email_from_meta[1])
|
||||
self.assertEqual(ticket1.title, test_email_subject)
|
||||
self.assertEqual(ticket1.description, test_email_body)
|
||||
|
||||
ticket2 = get_object_or_404(Ticket, pk=2)
|
||||
self.assertEqual(ticket2.ticket_for_url, "QQ-%s" % ticket2.id)
|
||||
self.assertEqual(ticket2.submitter_email, 'bbb@example.com')
|
||||
self.assertEqual(ticket2.submitter_email, test_email_from_meta[1])
|
||||
self.assertEqual(ticket2.title, test_email_subject)
|
||||
self.assertEqual(ticket2.description, test_email_body)
|
||||
|
||||
|
270
helpdesk/tests/utils.py
Normal file
270
helpdesk/tests/utils.py
Normal file
@ -0,0 +1,270 @@
|
||||
"""UItility functions facilitate making unit testing easier and less brittle."""
|
||||
|
||||
|
||||
from PIL import Image
|
||||
import email
|
||||
from email import encoders
|
||||
from email.message import Message
|
||||
from email.mime.base import MIMEBase
|
||||
from email.mime.image import MIMEImage
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
import factory
|
||||
import faker
|
||||
from io import BytesIO
|
||||
from numpy.random import randint
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
import typing
|
||||
from typing import Any, Optional, Tuple
|
||||
import unicodedata
|
||||
|
||||
|
||||
def strip_accents(text):
|
||||
"""
|
||||
Strip accents from input String. (only works on Pythin 3
|
||||
|
||||
:param text: The input string.
|
||||
:type text: String.
|
||||
|
||||
:returns: The processed String.
|
||||
:rtype: String.
|
||||
"""
|
||||
text = unicodedata.normalize('NFD', text)
|
||||
text = text.encode('ascii', 'ignore')
|
||||
text = text.decode("utf-8")
|
||||
return str(text)
|
||||
|
||||
|
||||
def text_to_id(text):
|
||||
"""
|
||||
Convert input text to id.
|
||||
|
||||
:param text: The input string.
|
||||
:type text: String.
|
||||
|
||||
:returns: The processed String.
|
||||
:rtype: String.
|
||||
"""
|
||||
text = strip_accents(text.lower())
|
||||
text = re.sub('[ ]+', '_', text)
|
||||
text = re.sub('[^0-9a-zA-Z_-]', '', text)
|
||||
return text
|
||||
|
||||
|
||||
def get_random_string(length: int=16) -> str:
|
||||
return "".join(
|
||||
[random.choice(string.ascii_letters + string.digits) for _ in range(length)]
|
||||
)
|
||||
|
||||
|
||||
def generate_random_image(image_format, array_dims):
|
||||
"""
|
||||
Creates an image from a random array.
|
||||
|
||||
:param image_format: An image format (PNG or JPEG).
|
||||
:param array_dims: A tuple with array dimensions.
|
||||
|
||||
:returns: A byte string with encoded image
|
||||
:rtype: bytes
|
||||
"""
|
||||
image_bytes = randint(low=0, high=255, size=array_dims, dtype='uint8')
|
||||
io = BytesIO()
|
||||
image_pil = Image.fromarray(image_bytes)
|
||||
image_pil.save(io, image_format, subsampling=0, quality=100)
|
||||
return io.getvalue()
|
||||
|
||||
|
||||
def get_random_image(image_format: str="PNG", size: int=5):
|
||||
"""
|
||||
Returns a random image.
|
||||
|
||||
Args:
|
||||
image_format: An image format (PNG or JPEG).
|
||||
|
||||
Returns:
|
||||
A string with encoded image
|
||||
"""
|
||||
return generate_random_image(image_format, (size, size, 3))
|
||||
|
||||
|
||||
def get_fake(provider: str, locale: str = "en_US", min_length: int = 5) -> Any:
|
||||
"""
|
||||
Generates a random string, float, integer etc based on provider
|
||||
Provider can be "text', 'sentence', "word"
|
||||
e.g. `get_fake('name')` ==> 'Buzz Aldrin'
|
||||
"""
|
||||
string = factory.Faker(provider).evaluate({}, None, {'locale': locale,})
|
||||
while len(string) < min_length:
|
||||
string += factory.Faker(provider).evaluate({}, None, {'locale': locale,})
|
||||
return string
|
||||
|
||||
|
||||
def get_fake_html(locale: str = "en_US", wrap_in_body_tag=True) -> Any:
|
||||
"""
|
||||
Generates a random string, float, integer etc based on provider
|
||||
Provider can be "text', 'sentence',
|
||||
e.g. `get_fake('name')` ==> 'Buzz Aldrin'
|
||||
"""
|
||||
html = factory.Faker("sentence").evaluate({}, None, {'locale': locale,})
|
||||
for _ in range(0,4):
|
||||
html += "<li>" + factory.Faker("sentence").evaluate({}, None, {'locale': locale,}) + "</li>"
|
||||
for _ in range(0,4):
|
||||
html += "<p>" + factory.Faker("text").evaluate({}, None, {'locale': locale,})
|
||||
return f"<body>{html}</body>" if wrap_in_body_tag else html
|
||||
|
||||
def generate_email_address(
|
||||
locale: str="en_US",
|
||||
use_short_email: bool=False,
|
||||
real_name_format: Optional[str]="{last_name}, {first_name}",
|
||||
last_name_override: Optional[str]=None) -> Tuple[str, str, str, str]:
|
||||
'''
|
||||
Generate an RFC 2822 email address
|
||||
|
||||
:param locale: change this to generate locale specific names
|
||||
:param use_short_email: defaults to false. If true then does not include real name in email address
|
||||
:param real_name_format: pass a different format if different than "{last_name}, {first_name}"
|
||||
:param last_name_override: override the fake name if you want some special characters in the last name
|
||||
:returns <RFC2822 formatted email for header>, <short email address>, <first name>, <last_name
|
||||
'''
|
||||
fake = faker.Faker(locale=locale)
|
||||
first_name = fake.first_name()
|
||||
last_name = last_name_override or fake.last_name()
|
||||
real_name = None if use_short_email else real_name_format.format(first_name=first_name, last_name=last_name)
|
||||
# Add a random string to ensure we do not generate a real domain name
|
||||
email_address = "{}.{}@{}".format(
|
||||
first_name.replace(' ', '').encode("ascii", "ignore").lower().decode(),
|
||||
last_name.replace(' ', '').encode("ascii", "ignore").lower().decode(),
|
||||
get_random_string(5) + fake.domain_name()
|
||||
)
|
||||
# format email address for RFC 2822 and return
|
||||
return email.utils.formataddr((real_name, email_address)), email_address, first_name, last_name
|
||||
|
||||
def generate_file_mime_part(locale: str="en_US",filename: str = None) -> Message:
|
||||
"""
|
||||
|
||||
:param locale: change this to generate locale specific file name and attachment content
|
||||
:param filename: pass a file name if you want to specify a specific name otherwise a random name will be generated
|
||||
"""
|
||||
part = MIMEBase('application', 'octet-stream')
|
||||
part.set_payload(get_fake("text", locale=locale, min_length=1024))
|
||||
encoders.encode_base64(part)
|
||||
if not filename:
|
||||
filename = get_fake("word", locale=locale, min_length=8) + ".txt"
|
||||
part.add_header('Content-Disposition', "attachment; filename= %s" % filename)
|
||||
return part
|
||||
|
||||
def generate_image_mime_part(locale: str="en_US",imagename: str = None) -> Message:
|
||||
"""
|
||||
|
||||
:param locale: change this to generate locale specific file name and attachment content
|
||||
:param filename: pass a file name if you want to specify a specific name otherwise a random name will be generated
|
||||
"""
|
||||
part = MIMEImage(generate_random_image(image_format="JPEG"))
|
||||
part.set_payload(get_fake("text", locale=locale, min_length=1024))
|
||||
encoders.encode_base64(part)
|
||||
if not imagename:
|
||||
imagename = get_fake("word", locale=locale, min_length=8) + ".jpg"
|
||||
part.add_header('Content-Disposition', "attachment; filename= %s" % imagename)
|
||||
return part
|
||||
|
||||
def generate_email_list(address_cnt: int = 3,
|
||||
locale: str="en_US",
|
||||
use_short_email: bool=False
|
||||
) -> str:
|
||||
"""
|
||||
Generates a list of email addresses formatted for email headers on a Mime part
|
||||
|
||||
:param address_cnt: the number of email addresses to string together
|
||||
:param locale: change this to generate locale specific "real names" and subject
|
||||
:param use_short_email: produces a email address without "real name" if True
|
||||
|
||||
"""
|
||||
email_address_list = [generate_email_address(locale, use_short_email=use_short_email)[0] for _ in range(0, address_cnt)]
|
||||
return ",".join(email_address_list)
|
||||
|
||||
def add_simple_email_headers(message: Message, locale: str="en_US",
|
||||
use_short_email: bool=False
|
||||
) -> typing.Tuple[typing.Tuple[str, str], typing.Tuple[str, str]]:
|
||||
"""
|
||||
Adds the key email headers to a Mime part
|
||||
|
||||
:param message: the Mime part to add headers to
|
||||
:param locale: change this to generate locale specific "real names" and subject
|
||||
:param use_short_email: produces a "To" or "From" that is only the email address if True
|
||||
|
||||
"""
|
||||
to_meta = generate_email_address(locale, use_short_email=use_short_email)
|
||||
from_meta = generate_email_address(locale, use_short_email=use_short_email)
|
||||
|
||||
message['Subject'] = get_fake("sentence", locale=locale)
|
||||
message['From'] = from_meta[0]
|
||||
message['To'] = to_meta[0]
|
||||
return from_meta, to_meta
|
||||
|
||||
def generate_mime_part(locale: str="en_US",
|
||||
part_type: str="plain",
|
||||
use_short_email: bool=False
|
||||
) -> typing.Tuple[Message, typing.Tuple[str, str], typing.Tuple[str, str]]:
|
||||
"""
|
||||
Generates amime part of the sepecified type
|
||||
|
||||
:param locale: change this to generate locale specific strings
|
||||
:param text_type: options are plain, html, image (attachment), file (attachment)
|
||||
:param use_short_email: produces a "To" or "From" that is only the email address if True
|
||||
"""
|
||||
if "plain" == part_type:
|
||||
body = get_fake("text", locale=locale, min_length=1024)
|
||||
msg = MIMEText(body)
|
||||
elif "html" == part_type:
|
||||
body = get_fake_html(locale=locale, wrap_in_body_tag=True)
|
||||
msg = MIMEText(body)
|
||||
elif "file" == part_type:
|
||||
msg = generate_file_mime_part(locale=locale)
|
||||
msg = MIMEText(body)
|
||||
elif "image" == part_type:
|
||||
msg = generate_image_mime_part(locale=locale)
|
||||
msg = MIMEText(body)
|
||||
else:
|
||||
raise Exception("Mime part not implemented: " + part_type)
|
||||
return msg
|
||||
|
||||
def generate_multipart_email(locale: str="en_US",
|
||||
type_list: typing.List[str]=["plain", "html", "attachment"],
|
||||
use_short_email: bool=False
|
||||
) -> typing.Tuple[Message, typing.Tuple[str, str], typing.Tuple[str, str]]:
|
||||
"""
|
||||
Generates an email including headers with the defined multiparts
|
||||
|
||||
:param locale:
|
||||
:param type_list: options are plain, html, image (attachment), file (attachment)
|
||||
:param use_short_email: produces a "To" or "From" that is only the email address if True
|
||||
"""
|
||||
msg = MIMEMultipart()
|
||||
for part_type in type_list:
|
||||
msg.append(generate_mime_part(locale=locale, part_type=part_type, use_short_email=use_short_email))
|
||||
from_meta, to_meta = add_simple_email_headers(msg, locale=locale, use_short_email=use_short_email)
|
||||
return msg, from_meta, to_meta
|
||||
|
||||
def generate_text_email(locale: str="en_US",
|
||||
use_short_email: bool=False
|
||||
) -> typing.Tuple[Message, typing.Tuple[str, str], typing.Tuple[str, str]]:
|
||||
"""
|
||||
Generates an email including headers
|
||||
"""
|
||||
body = get_fake("text", locale=locale, min_length=1024)
|
||||
msg = MIMEText(body)
|
||||
from_meta, to_meta = add_simple_email_headers(msg, locale=locale, use_short_email=use_short_email)
|
||||
return msg, from_meta, to_meta
|
||||
|
||||
def generate_html_email(locale: str="en_US",
|
||||
use_short_email: bool=False
|
||||
) -> typing.Tuple[Message, typing.Tuple[str, str], typing.Tuple[str, str]]:
|
||||
"""
|
||||
Generates an email including headers
|
||||
"""
|
||||
body = get_fake_html(locale=locale)
|
||||
msg = MIMEText(body)
|
||||
from_meta, to_meta = add_simple_email_headers(msg, locale=locale, use_short_email=use_short_email)
|
||||
return msg, from_meta, to_meta
|
5
requirements-dev.txt
Normal file
5
requirements-dev.txt
Normal file
@ -0,0 +1,5 @@
|
||||
tox
|
||||
autopep8
|
||||
flake8
|
||||
pycodestyle
|
||||
isort
|
@ -7,3 +7,6 @@ pbr
|
||||
mock
|
||||
freezegun
|
||||
isort
|
||||
numpy
|
||||
factory_boy
|
||||
faker
|
Loading…
Reference in New Issue
Block a user