forked from extern/django-helpdesk
Merge branch 'main' into fix-warnings
This commit is contained in:
commit
930fc71d8b
1
.gitignore
vendored
1
.gitignore
vendored
@ -12,6 +12,7 @@ docs/doctrees/*
|
||||
.directory
|
||||
*.swp
|
||||
.idea
|
||||
.tox/**
|
||||
|
||||
# ignore demo attachments that user might have added
|
||||
helpdesk/attachments/
|
||||
|
26
.travis.yml
26
.travis.yml
@ -1,26 +0,0 @@
|
||||
language: python
|
||||
dist: focal # use LTS 20.04
|
||||
|
||||
python:
|
||||
- "3.6"
|
||||
- "3.7"
|
||||
- "3.8"
|
||||
|
||||
env:
|
||||
- DJANGO=2.2.23
|
||||
- DJANGO=3.1.11
|
||||
- DJANGO=3.2.3
|
||||
|
||||
install:
|
||||
- pip install -q Django==$DJANGO
|
||||
- pip install -q -r requirements.txt
|
||||
- pip install -q -r requirements-testing.txt
|
||||
|
||||
before_script:
|
||||
- "pycodestyle --exclude=migrations --ignore=E501,W503,W504 helpdesk"
|
||||
|
||||
script:
|
||||
- coverage run --source='.' quicktest.py helpdesk
|
||||
|
||||
after_success:
|
||||
- codecov
|
16
Makefile
16
Makefile
@ -56,6 +56,22 @@ test:
|
||||
mkdir -p var
|
||||
$(PIP) install -e .[test]
|
||||
$(TOX)
|
||||
rm -rf var
|
||||
|
||||
|
||||
|
||||
#: format - Run the PEP8 formatter.
|
||||
.PHONY: format
|
||||
format:
|
||||
autopep8 --exit-code --global-config .flake8 helpdesk
|
||||
isort --line-length=120 --src helpdesk .
|
||||
|
||||
|
||||
#: checkformat - checks formatting against configured format specifications for the project.
|
||||
.PHONY: checkformat
|
||||
checkformat:
|
||||
flake8 helpdesk --count --show-source --statistics --exit-zero --max-complexity=20
|
||||
isort --line-length=120 --src helpdesk . --check
|
||||
|
||||
|
||||
#: documentation - Build documentation (Sphinx, README, ...).
|
||||
|
24
README.rst
24
README.rst
@ -17,9 +17,6 @@ contributors reaching far beyond Jutda.
|
||||
Complete documentation is available in the docs/ directory,
|
||||
or online at http://django-helpdesk.readthedocs.org/.
|
||||
|
||||
You can see a demo installation at https://django-helpdesk-demo.herokuapp.com/,
|
||||
or run a demo locally in just a couple steps!
|
||||
|
||||
Demo Quickstart
|
||||
---------------
|
||||
|
||||
@ -69,9 +66,30 @@ Django project.
|
||||
For further installation information see `docs/install.html`
|
||||
and `docs/configuration.html`
|
||||
|
||||
Developer Environment
|
||||
---------------------
|
||||
|
||||
Follow these steps to set up your development environment to contribute to helpdesk:
|
||||
- install a virtual environment
|
||||
- using virtualenv from the helpdesk base folder do::
|
||||
virtualenv .venv && source .venv/bin/activate
|
||||
|
||||
- install the requirements for development::
|
||||
pip install -r requirements.txt -r requirements-dev.txt
|
||||
|
||||
To see option for the Makefile run: `make`
|
||||
|
||||
The project enforces a standardized formatting in the CI/CD pipeline. To ensure you have the correct formatting run::
|
||||
make checkformat
|
||||
|
||||
To auto format any code use this::
|
||||
make format
|
||||
|
||||
Testing
|
||||
-------
|
||||
|
||||
From the command line you can run the tests using: `make test`
|
||||
|
||||
See `quicktest.py` for usage details.
|
||||
|
||||
Upgrading from previous versions
|
||||
|
@ -7,6 +7,7 @@ See LICENSE for details.
|
||||
|
||||
# import base64
|
||||
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
from datetime import timedelta
|
||||
from django.conf import settings as django_settings
|
||||
@ -17,9 +18,11 @@ from django.db.models import Q
|
||||
from django.utils import encoding, timezone
|
||||
from django.utils.translation import gettext as _
|
||||
import email
|
||||
from email.message import Message
|
||||
from email.utils import getaddresses
|
||||
from email_reply_parser import EmailReplyParser
|
||||
from helpdesk import settings
|
||||
from helpdesk.exceptions import DeleteIgnoredTicketException, IgnoreTicketException
|
||||
from helpdesk.lib import process_attachments, safe_template_context
|
||||
from helpdesk.models import FollowUp, IgnoreEmail, Queue, Ticket
|
||||
import imaplib
|
||||
@ -34,6 +37,7 @@ import ssl
|
||||
import sys
|
||||
from time import ctime
|
||||
import typing
|
||||
from typing import List, Tuple
|
||||
|
||||
|
||||
# import User model, which may be a custom model
|
||||
@ -135,9 +139,16 @@ def pop3_sync(q, logger, server):
|
||||
else:
|
||||
full_message = encoding.force_str(
|
||||
"\n".join(raw_content), errors='replace')
|
||||
ticket = object_from_message(
|
||||
message=full_message, queue=q, logger=logger)
|
||||
|
||||
try:
|
||||
ticket = object_from_message(message=full_message, queue=q, logger=logger)
|
||||
except IgnoreTicketException:
|
||||
logger.warn(
|
||||
"Message %s was ignored and will be left on POP3 server" % msgNum)
|
||||
except DeleteIgnoredTicketException:
|
||||
logger.warn(
|
||||
"Message %s was ignored and deleted from POP3 server" % msgNum)
|
||||
server.dele(msgNum)
|
||||
else:
|
||||
if ticket:
|
||||
server.dele(msgNum)
|
||||
logger.info(
|
||||
@ -186,10 +197,16 @@ def imap_sync(q, logger, server):
|
||||
data = server.fetch(num, '(RFC822)')[1]
|
||||
full_message = encoding.force_str(data[0][1], errors='replace')
|
||||
try:
|
||||
ticket = object_from_message(
|
||||
message=full_message, queue=q, logger=logger)
|
||||
except TypeError:
|
||||
ticket = None # hotfix. Need to work out WHY.
|
||||
ticket = object_from_message(message=full_message, queue=q, logger=logger)
|
||||
except IgnoreTicketException:
|
||||
logger.warn("Message %s was ignored and will be left on IMAP server" % num)
|
||||
except DeleteIgnoredTicketException:
|
||||
server.store(num, '+FLAGS', '\\Deleted')
|
||||
logger.warn("Message %s was ignored and deleted from IMAP server" % num)
|
||||
except TypeError as te:
|
||||
# Log the error with stacktrace to help identify what went wrong
|
||||
logger.error(f"Unexpected error processing message: {te}", exc_info=True)
|
||||
else:
|
||||
if ticket:
|
||||
server.store(num, '+FLAGS', '\\Deleted')
|
||||
logger.info(
|
||||
@ -282,8 +299,14 @@ def process_queue(q, logger):
|
||||
logger.info("Processing message %d" % i)
|
||||
with open(m, 'r') as f:
|
||||
full_message = encoding.force_str(f.read(), errors='replace')
|
||||
ticket = object_from_message(
|
||||
message=full_message, queue=q, logger=logger)
|
||||
try:
|
||||
ticket = object_from_message(message=full_message, queue=q, logger=logger)
|
||||
except IgnoreTicketException:
|
||||
logger.warn("Message %d was ignored and will be left in local directory", i)
|
||||
except DeleteIgnoredTicketException:
|
||||
os.unlink(m)
|
||||
logger.warn("Message %d was ignored and deleted local directory", i)
|
||||
else:
|
||||
if ticket:
|
||||
logger.info(
|
||||
"Successfully processed message %d, ticket/comment created.", i)
|
||||
@ -455,7 +478,11 @@ def create_object_from_email_message(message, ticket_id, payload, files, logger)
|
||||
|
||||
logger.info("[%s-%s] %s" % (ticket.queue.slug, ticket.id, ticket.title,))
|
||||
|
||||
try:
|
||||
attached = process_attachments(f, files)
|
||||
except ValidationError as e:
|
||||
logger.error(str(e))
|
||||
else:
|
||||
for att_file in attached:
|
||||
logger.info(
|
||||
"Attachment '%s' (with size %s) successfully added to ticket from email.",
|
||||
@ -472,6 +499,11 @@ def create_object_from_email_message(message, ticket_id, payload, files, logger)
|
||||
logger.info(
|
||||
"Message seems to be auto-reply, not sending any emails back to the sender")
|
||||
else:
|
||||
send_info_email(message_id, f, ticket, context, queue, new)
|
||||
return ticket
|
||||
|
||||
|
||||
def send_info_email(message_id: str, f: FollowUp, ticket: Ticket, context: dict, queue: dict, new: bool):
|
||||
# send mail to appropriate people now depending on what objects
|
||||
# were created and who was CC'd
|
||||
# Add auto-reply headers because it's an auto-reply and we must
|
||||
@ -504,8 +536,6 @@ def create_object_from_email_message(message, ticket_id, payload, files, logger)
|
||||
extra_headers=extra_headers,
|
||||
)
|
||||
|
||||
return ticket
|
||||
|
||||
|
||||
def get_ticket_id_from_subject_slug(
|
||||
queue_slug: str,
|
||||
@ -574,89 +604,66 @@ def get_email_body_from_part_payload(part) -> str:
|
||||
)
|
||||
|
||||
|
||||
def object_from_message(message: str,
|
||||
queue: Queue,
|
||||
logger: logging.Logger
|
||||
) -> Ticket:
|
||||
# 'message' must be an RFC822 formatted message.
|
||||
message = email.message_from_string(message)
|
||||
|
||||
subject = message.get('subject', _('Comment from e-mail'))
|
||||
subject = decode_mail_headers(
|
||||
decodeUnknown(message.get_charset(), subject))
|
||||
for affix in STRIPPED_SUBJECT_STRINGS:
|
||||
subject = subject.replace(affix, "")
|
||||
subject = subject.strip()
|
||||
|
||||
sender = message.get('from', _('Unknown Sender'))
|
||||
sender = decode_mail_headers(decodeUnknown(message.get_charset(), sender))
|
||||
|
||||
# to address bug #832, we wrap all the text in front of the email address in
|
||||
# double quotes by using replace() on the email string. Then,
|
||||
# take first item of list, second item of tuple is the actual email address.
|
||||
# Note that the replace won't work on just an email with no real name,
|
||||
# but the getaddresses() function seems to be able to handle just unclosed quotes
|
||||
# correctly. Not ideal, but this seems to work for now.
|
||||
sender_email = email.utils.getaddresses(
|
||||
['\"' + sender.replace('<', '\" <')])[0][1]
|
||||
|
||||
for ignore in IgnoreEmail.objects.filter(Q(queues=queue) | Q(queues__isnull=True)):
|
||||
if ignore.test(sender_email):
|
||||
# By returning 'False' the message will be kept in the mailbox,
|
||||
# and the 'True' will cause the message to be deleted.
|
||||
return not ignore.keep_in_mailbox
|
||||
|
||||
ticket_id: typing.Optional[int] = get_ticket_id_from_subject_slug(
|
||||
queue.slug,
|
||||
subject,
|
||||
logger
|
||||
)
|
||||
|
||||
def attempt_body_extract_from_html(message: str) -> str:
|
||||
mail = BeautifulSoup(str(message), "html.parser")
|
||||
beautiful_body = mail.find('body')
|
||||
body = None
|
||||
full_body = None
|
||||
counter = 0
|
||||
files = []
|
||||
if beautiful_body:
|
||||
try:
|
||||
body = beautiful_body.text
|
||||
full_body = body
|
||||
except AttributeError:
|
||||
pass
|
||||
if not body:
|
||||
body = ""
|
||||
return body, full_body
|
||||
|
||||
for part in message.walk():
|
||||
if part.get_content_maintype() == 'multipart':
|
||||
continue
|
||||
|
||||
name = part.get_param("name")
|
||||
def extract_part_data(
|
||||
part: Message,
|
||||
counter: int,
|
||||
ticket_id: int,
|
||||
files: List,
|
||||
logger: logging.Logger
|
||||
) -> Tuple[str, str]:
|
||||
name = part.get_filename()
|
||||
if name:
|
||||
name = email.utils.collapse_rfc2231_value(name)
|
||||
|
||||
part_body = None
|
||||
part_full_body = None
|
||||
if part.get_content_maintype() == 'text' and name is None:
|
||||
if part.get_content_subtype() == 'plain':
|
||||
body = part.get_payload(decode=True)
|
||||
part_body = part.get_payload(decode=True)
|
||||
# https://github.com/django-helpdesk/django-helpdesk/issues/732
|
||||
if part['Content-Transfer-Encoding'] == '8bit' and part.get_content_charset() == 'utf-8':
|
||||
body = body.decode('unicode_escape')
|
||||
body = decodeUnknown(part.get_content_charset(), body)
|
||||
# have to use django_settings here so overwritting it works in tests
|
||||
part_body = part_body.decode('unicode_escape')
|
||||
part_body = decodeUnknown(part.get_content_charset(), part_body)
|
||||
# have to use django_settings here so overwriting it works in tests
|
||||
# the default value is False anyway
|
||||
if ticket_id is None and getattr(django_settings, 'HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL', False):
|
||||
# first message in thread, we save full body to avoid
|
||||
# losing forwards and things like that
|
||||
full_body = get_body_from_fragments(body)
|
||||
body = EmailReplyParser.parse_reply(body)
|
||||
part_full_body = get_body_from_fragments(part_body)
|
||||
part_body = EmailReplyParser.parse_reply(part_body)
|
||||
else:
|
||||
# second and other reply, save only first part of the
|
||||
# message
|
||||
body = EmailReplyParser.parse_reply(body)
|
||||
full_body = body
|
||||
part_body = EmailReplyParser.parse_reply(part_body)
|
||||
part_full_body = part_body
|
||||
# workaround to get unicode text out rather than escaped text
|
||||
body = get_encoded_body(body)
|
||||
part_body = get_encoded_body(part_body)
|
||||
logger.debug("Discovered plain text MIME part")
|
||||
else:
|
||||
email_body = get_email_body_from_part_payload(part)
|
||||
|
||||
if not body and not full_body:
|
||||
if not part_body and not part_full_body:
|
||||
# no text has been parsed so far - try such deep parsing
|
||||
# for some messages
|
||||
altered_body = email_body.replace(
|
||||
"</p>", "</p>\n").replace("<br", "\n<br")
|
||||
mail = BeautifulSoup(str(altered_body), "html.parser")
|
||||
full_body = mail.get_text()
|
||||
part_full_body = mail.get_text()
|
||||
|
||||
if "<body" not in email_body:
|
||||
email_body = f"<body>{email_body}</body>"
|
||||
@ -677,48 +684,73 @@ def object_from_message(message: str,
|
||||
else:
|
||||
if not name:
|
||||
ext = mimetypes.guess_extension(part.get_content_type())
|
||||
name = "part-%i%s" % (counter, ext)
|
||||
name = f"part-{counter}{ext}"
|
||||
else:
|
||||
name = ("part-%i_" % counter) + name
|
||||
name = f"part-{counter}_{name}"
|
||||
|
||||
# # FIXME: this code gets the paylods, then does something with it and then completely ignores it
|
||||
# # writing the part.get_payload(decode=True) instead; and then the payload variable is
|
||||
# # replaced by some dict later.
|
||||
# # the `payloadToWrite` has been also ignored so was commented
|
||||
# payload = part.get_payload()
|
||||
# if isinstance(payload, list):
|
||||
# payload = payload.pop().as_string()
|
||||
# # payloadToWrite = payload
|
||||
# # check version of python to ensure use of only the correct error type
|
||||
# non_b64_err = TypeError
|
||||
# try:
|
||||
# logger.debug("Try to base64 decode the attachment payload")
|
||||
# # payloadToWrite = base64.decodebytes(payload)
|
||||
# except non_b64_err:
|
||||
# logger.debug("Payload was not base64 encoded, using raw bytes")
|
||||
# # payloadToWrite = payload
|
||||
files.append(SimpleUploadedFile(name, part.get_payload(
|
||||
decode=True), mimetypes.guess_type(name)[0]))
|
||||
logger.debug("Found MIME attachment %s" % name)
|
||||
files.append(SimpleUploadedFile(name, part.get_payload(decode=True), mimetypes.guess_type(name)[0]))
|
||||
logger.debug("Found MIME attachment %s", name)
|
||||
return part_body, part_full_body
|
||||
|
||||
|
||||
def object_from_message(message: str,
|
||||
queue: Queue,
|
||||
logger: logging.Logger
|
||||
) -> Ticket:
|
||||
# 'message' must be an RFC822 formatted message to correctly parse.
|
||||
message_obj = email.message_from_string(message)
|
||||
|
||||
subject = message_obj.get('subject', _('Comment from e-mail'))
|
||||
subject = decode_mail_headers(
|
||||
decodeUnknown(message_obj.get_charset(), subject))
|
||||
for affix in STRIPPED_SUBJECT_STRINGS:
|
||||
subject = subject.replace(affix, "")
|
||||
subject = subject.strip()
|
||||
|
||||
# TODO: Should really be assigning a properly formatted fake email.
|
||||
# Check if anything relies on this being a "real name" formatted string if no sender is found on message_obj.
|
||||
# Also not sure it should be accepting emails from unknown senders
|
||||
sender_email = _('Unknown Sender')
|
||||
sender_hdr = message_obj.get('from')
|
||||
if sender_hdr:
|
||||
# Parse the header which extracts the first email address in the list if more than one
|
||||
# The parseaddr method returns a tuple in the form <real name> <email address>
|
||||
# Only need the actual email address from the tuple not the "real name"
|
||||
# Since the spec requires that all email addresses are ASCII, they will not be encoded
|
||||
sender_email = email.utils.parseaddr(sender_hdr)[1]
|
||||
|
||||
for ignore in IgnoreEmail.objects.filter(Q(queues=queue) | Q(queues__isnull=True)):
|
||||
if ignore.test(sender_email):
|
||||
raise IgnoreTicketException() if ignore.keep_in_mailbox else DeleteIgnoredTicketException()
|
||||
|
||||
ticket_id: typing.Optional[int] = get_ticket_id_from_subject_slug(
|
||||
queue.slug,
|
||||
subject,
|
||||
logger
|
||||
)
|
||||
|
||||
body = None
|
||||
full_body = None
|
||||
counter = 0
|
||||
files = []
|
||||
|
||||
for part in message_obj.walk():
|
||||
if part.get_content_maintype() == 'multipart':
|
||||
continue
|
||||
# See email.message_obj.Message.get_filename()
|
||||
part_body, part_full_body = extract_part_data(part, counter, ticket_id, files, logger)
|
||||
if part_body:
|
||||
body = part_body
|
||||
full_body = part_full_body
|
||||
counter += 1
|
||||
|
||||
if not body:
|
||||
mail = BeautifulSoup(str(message), "html.parser")
|
||||
beautiful_body = mail.find('body')
|
||||
if beautiful_body:
|
||||
try:
|
||||
body = beautiful_body.text
|
||||
full_body = body
|
||||
except AttributeError:
|
||||
pass
|
||||
if not body:
|
||||
body = ""
|
||||
body, full_body = attempt_body_extract_from_html(message_obj)
|
||||
|
||||
add_file_if_always_save_incoming_email_message(files, message)
|
||||
add_file_if_always_save_incoming_email_message(files, message_obj)
|
||||
|
||||
smtp_priority = message.get('priority', '')
|
||||
smtp_importance = message.get('importance', '')
|
||||
smtp_priority = message_obj.get('priority', '')
|
||||
smtp_importance = message_obj.get('importance', '')
|
||||
high_priority_types = {'high', 'important', '1', 'urgent'}
|
||||
priority = 2 if high_priority_types & {
|
||||
smtp_priority, smtp_importance} else 3
|
||||
@ -733,4 +765,4 @@ def object_from_message(message: str,
|
||||
'files': files,
|
||||
}
|
||||
|
||||
return create_object_from_email_message(message, ticket_id, payload, files, logger=logger)
|
||||
return create_object_from_email_message(message_obj, ticket_id, payload, files, logger=logger)
|
||||
|
13
helpdesk/exceptions.py
Normal file
13
helpdesk/exceptions.py
Normal file
@ -0,0 +1,13 @@
|
||||
class IgnoreTicketException(Exception):
|
||||
"""
|
||||
Raised when an email message is received from a sender who is marked to be ignored
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class DeleteIgnoredTicketException(Exception):
|
||||
"""
|
||||
Raised when an email message is received from a sender who is marked to be ignored
|
||||
and the record is tagged to delete the email from the inbox
|
||||
"""
|
||||
pass
|
@ -7,6 +7,7 @@ forms.py - Definitions of newforms-based forms for creating and maintaining
|
||||
tickets.
|
||||
"""
|
||||
|
||||
|
||||
from datetime import datetime
|
||||
from django import forms
|
||||
from django.conf import settings
|
||||
@ -33,6 +34,7 @@ from helpdesk.settings import (
|
||||
CUSTOMFIELD_TIME_FORMAT,
|
||||
CUSTOMFIELD_TO_FIELD_DICT
|
||||
)
|
||||
from helpdesk.validators import validate_file_extension
|
||||
import logging
|
||||
|
||||
|
||||
@ -243,6 +245,7 @@ class AbstractTicketForm(CustomFieldMixin, forms.Form):
|
||||
'Only file types such as plain text (.txt), '
|
||||
'a document (.pdf, .docx, or .odt), '
|
||||
'or screenshot (.png or .jpg) may be uploaded.'),
|
||||
validators=[validate_file_extension]
|
||||
)
|
||||
|
||||
class Media:
|
||||
|
@ -9,6 +9,7 @@ lib.py - Common functions (eg multipart e-mail)
|
||||
|
||||
from datetime import date, datetime, time
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.utils.encoding import smart_str
|
||||
from helpdesk.settings import CUSTOMFIELD_DATE_FORMAT, CUSTOMFIELD_DATETIME_FORMAT, CUSTOMFIELD_TIME_FORMAT
|
||||
import logging
|
||||
@ -80,12 +81,12 @@ def text_is_spam(text, request):
|
||||
# This will return 'True' is the given text is deemed to be spam, or
|
||||
# False if it is not spam. If it cannot be checked for some reason, we
|
||||
# assume it isn't spam.
|
||||
from django.contrib.sites.models import Site
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
try:
|
||||
from akismet import Akismet
|
||||
except ImportError:
|
||||
return False
|
||||
from django.contrib.sites.models import Site
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
try:
|
||||
site = Site.objects.get_current()
|
||||
except ImproperlyConfigured:
|
||||
@ -132,6 +133,7 @@ def process_attachments(followup, attached_files):
|
||||
max_email_attachment_size = getattr(
|
||||
settings, 'HELPDESK_MAX_EMAIL_ATTACHMENT_SIZE', 512000)
|
||||
attachments = []
|
||||
errors = set()
|
||||
|
||||
for attached in attached_files:
|
||||
|
||||
@ -148,7 +150,11 @@ def process_attachments(followup, attached_files):
|
||||
'application/octet-stream',
|
||||
size=attached.size,
|
||||
)
|
||||
try:
|
||||
att.full_clean()
|
||||
except ValidationError as e:
|
||||
errors.add(e)
|
||||
else:
|
||||
att.save()
|
||||
|
||||
if attached.size < max_email_attachment_size:
|
||||
@ -157,6 +163,9 @@ def process_attachments(followup, attached_files):
|
||||
# email.
|
||||
attachments.append([filename, att.file])
|
||||
|
||||
if errors:
|
||||
raise ValidationError(list(errors))
|
||||
|
||||
return attachments
|
||||
|
||||
|
||||
|
@ -27,27 +27,6 @@ def query_from_base64(b64data):
|
||||
return query
|
||||
|
||||
|
||||
def query_to_dict(results, descriptions):
|
||||
"""
|
||||
Replacement method for cursor.dictfetchall() as that method no longer
|
||||
exists in psycopg2, and I'm guessing in other backends too.
|
||||
|
||||
Converts the results of a raw SQL query into a list of dictionaries, suitable
|
||||
for use in templates etc.
|
||||
"""
|
||||
|
||||
output = []
|
||||
for data in results:
|
||||
row = {}
|
||||
i = 0
|
||||
for column in descriptions:
|
||||
row[column[0]] = data[i]
|
||||
i += 1
|
||||
|
||||
output.append(row)
|
||||
return output
|
||||
|
||||
|
||||
def get_search_filter_args(search):
|
||||
if search.startswith('queue:'):
|
||||
return Q(queue__title__icontains=search[len('queue:'):])
|
||||
|
@ -1,13 +1,19 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
from django.contrib.auth.hashers import make_password
|
||||
from django.contrib.auth.models import User
|
||||
from django.core import mail
|
||||
from django.core.files.uploadedfile import SimpleUploadedFile
|
||||
from django.core.management import call_command
|
||||
from django.shortcuts import get_object_or_404
|
||||
from django.test import override_settings, TestCase
|
||||
import helpdesk.email
|
||||
from helpdesk.email import extract_part_data, object_from_message
|
||||
from helpdesk.exceptions import DeleteIgnoredTicketException, IgnoreTicketException
|
||||
from helpdesk.management.commands.get_email import Command
|
||||
from helpdesk.models import FollowUp, FollowUpAttachment, Queue, Ticket, TicketCC
|
||||
from helpdesk.models import FollowUp, FollowUpAttachment, IgnoreEmail, Queue, Ticket, TicketCC
|
||||
from helpdesk.tests import utils
|
||||
import itertools
|
||||
import logging
|
||||
import os
|
||||
@ -15,6 +21,7 @@ from shutil import rmtree
|
||||
import six
|
||||
import sys
|
||||
from tempfile import mkdtemp
|
||||
import typing
|
||||
from unittest import mock
|
||||
|
||||
|
||||
@ -30,7 +37,7 @@ unused_port = "49151"
|
||||
class GetEmailCommonTests(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.queue_public = Queue.objects.create()
|
||||
self.queue_public = Queue.objects.create(title='Test', slug='test')
|
||||
self.logger = logging.getLogger('helpdesk')
|
||||
|
||||
# tests correct syntax for command line option
|
||||
@ -42,7 +49,7 @@ class GetEmailCommonTests(TestCase):
|
||||
with mock.patch.object(Command, 'handle', return_value=None) as mocked_handle:
|
||||
call_command('get_email', quiet=quiet_test_value)
|
||||
mocked_handle.assert_called_once()
|
||||
for args, kwargs in mocked_handle.call_args_list:
|
||||
for _, kwargs in mocked_handle.call_args_list:
|
||||
self.assertEqual(quiet_test_value, (kwargs['quiet']))
|
||||
|
||||
def test_email_with_blank_body_and_attachment(self):
|
||||
@ -132,6 +139,98 @@ class GetEmailCommonTests(TestCase):
|
||||
assert "Hello there!" in FollowUp.objects.filter(
|
||||
ticket=ticket).first().comment
|
||||
|
||||
def test_will_delete_ignored_email(self):
|
||||
"""
|
||||
Tests if an email will be ignored if configured to do so and throws the correct exception
|
||||
to ensure the email is deleted
|
||||
"""
|
||||
message, from_meta, _ = utils.generate_text_email(locale="es_ES")
|
||||
ignore = IgnoreEmail(name="Test Ignore", email_address=from_meta[1], keep_in_mailbox=False)
|
||||
ignore.save()
|
||||
with self.assertRaises(DeleteIgnoredTicketException):
|
||||
object_from_message(message.as_string(), self.queue_public, self.logger)
|
||||
|
||||
def test_will_not_delete_ignored_email(self):
|
||||
"""
|
||||
Tests if an email will be ignored if configured to do so and throws the correct exception
|
||||
to ensure the email is NOT deleted
|
||||
"""
|
||||
message, from_meta, _ = utils.generate_text_email(locale="es_ES")
|
||||
ignore = IgnoreEmail(name="Test Ignore", email_address=from_meta[1], keep_in_mailbox=True)
|
||||
ignore.save()
|
||||
with self.assertRaises(IgnoreTicketException):
|
||||
object_from_message(message.as_string(), self.queue_public, self.logger)
|
||||
|
||||
def test_utf8_filename_attachment(self):
|
||||
"""
|
||||
Tests if an attachment correctly sent with a UTF8 filename in disposition is extracted correctly
|
||||
"""
|
||||
filename = "TeléfonoMañana.txt"
|
||||
part = utils.generate_file_mime_part(locale="es_ES", filename=filename)
|
||||
files: typing.List[SimpleUploadedFile] = []
|
||||
extract_part_data(part, counter=1, ticket_id="tst1", files=files, logger=self.logger)
|
||||
sent_file: SimpleUploadedFile = files[0]
|
||||
# The extractor prepends a part identifier so compare the ending
|
||||
self.assertTrue(sent_file.name.endswith(filename), f"Filename extracted does not match: {sent_file.name}")
|
||||
|
||||
@override_settings(VALID_EXTENSIONS=['.png'])
|
||||
def test_wrong_extension_attachment(self):
|
||||
"""
|
||||
Tests if an attachment with a wrong extension doesn't stop the email process
|
||||
"""
|
||||
message, _, _ = utils.generate_multipart_email(type_list=['plain', 'image'])
|
||||
|
||||
self.assertEqual(len(mail.outbox), 0)
|
||||
|
||||
with self.assertLogs(logger='helpdesk', level='ERROR') as cm:
|
||||
object_from_message(message.as_string(), self.queue_public, self.logger)
|
||||
|
||||
self.assertIn(
|
||||
"ERROR:helpdesk:['Unsupported file extension: .jpg']",
|
||||
cm.output
|
||||
)
|
||||
|
||||
self.assertEqual(len(mail.outbox), 1)
|
||||
self.assertEqual(f'[test-1] {message.get("subject")} (Opened)', mail.outbox[0].subject)
|
||||
|
||||
def test_multiple_attachments(self):
|
||||
"""
|
||||
Tests the saving of multiple attachments
|
||||
"""
|
||||
message, _, _ = utils.generate_multipart_email(type_list=['plain', 'file', 'image'])
|
||||
|
||||
self.assertEqual(len(mail.outbox), 0)
|
||||
|
||||
object_from_message(message.as_string(), self.queue_public, self.logger)
|
||||
|
||||
self.assertEqual(len(mail.outbox), 1)
|
||||
self.assertEqual(f'[test-1] {message.get("subject")} (Opened)', mail.outbox[0].subject)
|
||||
|
||||
ticket = Ticket.objects.get()
|
||||
followup = ticket.followup_set.get()
|
||||
self.assertEqual(2, followup.followupattachment_set.count())
|
||||
|
||||
@override_settings(VALID_EXTENSIONS=['.txt'])
|
||||
def test_multiple_attachments_with_wrong_extension(self):
|
||||
"""
|
||||
Tests that a wrong extension won't stop from saving other valid attachment
|
||||
"""
|
||||
message, _, _ = utils.generate_multipart_email(type_list=['plain', 'image', 'file', 'image'])
|
||||
|
||||
self.assertEqual(len(mail.outbox), 0)
|
||||
|
||||
with self.assertLogs(logger='helpdesk', level='ERROR') as cm:
|
||||
object_from_message(message.as_string(), self.queue_public, self.logger)
|
||||
|
||||
self.assertIn(
|
||||
"ERROR:helpdesk:['Unsupported file extension: .jpg']",
|
||||
cm.output
|
||||
)
|
||||
|
||||
ticket = Ticket.objects.get()
|
||||
followup = ticket.followup_set.get()
|
||||
self.assertEqual(1, followup.followupattachment_set.count())
|
||||
|
||||
|
||||
class GetEmailParametricTemplate(object):
|
||||
"""TestCase that checks basic email functionality across methods and socks configs."""
|
||||
@ -258,12 +357,12 @@ class GetEmailParametricTemplate(object):
|
||||
"""Tests correctly decoding mail headers when a comma is encoded into
|
||||
UTF-8. See bug report #832."""
|
||||
|
||||
# example email text from Django docs:
|
||||
# https://docs.djangoproject.com/en/1.10/ref/unicode/
|
||||
test_email_from = "Bernard-Bouissières, Benjamin <bbb@example.com>"
|
||||
# Create the from using standard RFC required formats
|
||||
# Override the last_name to ensure we get a non-ascii character in it
|
||||
test_email_from_meta = utils.generate_email_address("fr_FR", last_name_override="Bouissières")
|
||||
test_email_subject = "Commas in From lines"
|
||||
test_email_body = "Testing commas in from email UTF-8."
|
||||
test_email = "To: helpdesk@example.com\nFrom: " + test_email_from + \
|
||||
test_email = "To: helpdesk@example.com\nFrom: " + test_email_from_meta[0] + \
|
||||
"\nSubject: " + test_email_subject + "\n\n" + test_email_body
|
||||
test_mail_len = len(test_email)
|
||||
|
||||
@ -333,13 +432,13 @@ class GetEmailParametricTemplate(object):
|
||||
|
||||
ticket1 = get_object_or_404(Ticket, pk=1)
|
||||
self.assertEqual(ticket1.ticket_for_url, "QQ-%s" % ticket1.id)
|
||||
self.assertEqual(ticket1.submitter_email, 'bbb@example.com')
|
||||
self.assertEqual(ticket1.submitter_email, test_email_from_meta[1])
|
||||
self.assertEqual(ticket1.title, test_email_subject)
|
||||
self.assertEqual(ticket1.description, test_email_body)
|
||||
|
||||
ticket2 = get_object_or_404(Ticket, pk=2)
|
||||
self.assertEqual(ticket2.ticket_for_url, "QQ-%s" % ticket2.id)
|
||||
self.assertEqual(ticket2.submitter_email, 'bbb@example.com')
|
||||
self.assertEqual(ticket2.submitter_email, test_email_from_meta[1])
|
||||
self.assertEqual(ticket2.title, test_email_subject)
|
||||
self.assertEqual(ticket2.description, test_email_body)
|
||||
|
||||
|
266
helpdesk/tests/utils.py
Normal file
266
helpdesk/tests/utils.py
Normal file
@ -0,0 +1,266 @@
|
||||
"""UItility functions facilitate making unit testing easier and less brittle."""
|
||||
|
||||
|
||||
from PIL import Image
|
||||
import email
|
||||
from email import encoders
|
||||
from email.message import Message
|
||||
from email.mime.base import MIMEBase
|
||||
from email.mime.image import MIMEImage
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
import factory
|
||||
import faker
|
||||
from io import BytesIO
|
||||
from numpy.random import randint
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
import typing
|
||||
from typing import Any, Optional, Tuple
|
||||
import unicodedata
|
||||
|
||||
|
||||
def strip_accents(text):
|
||||
"""
|
||||
Strip accents from input String. (only works on Pythin 3
|
||||
|
||||
:param text: The input string.
|
||||
:type text: String.
|
||||
|
||||
:returns: The processed String.
|
||||
:rtype: String.
|
||||
"""
|
||||
text = unicodedata.normalize('NFD', text)
|
||||
text = text.encode('ascii', 'ignore')
|
||||
text = text.decode("utf-8")
|
||||
return str(text)
|
||||
|
||||
|
||||
def text_to_id(text):
|
||||
"""
|
||||
Convert input text to id.
|
||||
|
||||
:param text: The input string.
|
||||
:type text: String.
|
||||
|
||||
:returns: The processed String.
|
||||
:rtype: String.
|
||||
"""
|
||||
text = strip_accents(text.lower())
|
||||
text = re.sub('[ ]+', '_', text)
|
||||
text = re.sub('[^0-9a-zA-Z_-]', '', text)
|
||||
return text
|
||||
|
||||
|
||||
def get_random_string(length: int=16) -> str:
|
||||
return "".join(
|
||||
[random.choice(string.ascii_letters + string.digits) for _ in range(length)]
|
||||
)
|
||||
|
||||
|
||||
def generate_random_image(image_format, array_dims):
|
||||
"""
|
||||
Creates an image from a random array.
|
||||
|
||||
:param image_format: An image format (PNG or JPEG).
|
||||
:param array_dims: A tuple with array dimensions.
|
||||
|
||||
:returns: A byte string with encoded image
|
||||
:rtype: bytes
|
||||
"""
|
||||
image_bytes = randint(low=0, high=255, size=array_dims, dtype='uint8')
|
||||
io = BytesIO()
|
||||
image_pil = Image.fromarray(image_bytes)
|
||||
image_pil.save(io, image_format, subsampling=0, quality=100)
|
||||
return io.getvalue()
|
||||
|
||||
|
||||
def get_random_image(image_format: str="PNG", size: int=5):
|
||||
"""
|
||||
Returns a random image.
|
||||
|
||||
Args:
|
||||
image_format: An image format (PNG or JPEG).
|
||||
|
||||
Returns:
|
||||
A string with encoded image
|
||||
"""
|
||||
return generate_random_image(image_format, (size, size, 3))
|
||||
|
||||
|
||||
def get_fake(provider: str, locale: str = "en_US", min_length: int = 5) -> Any:
|
||||
"""
|
||||
Generates a random string, float, integer etc based on provider
|
||||
Provider can be "text', 'sentence', "word"
|
||||
e.g. `get_fake('name')` ==> 'Buzz Aldrin'
|
||||
"""
|
||||
string = factory.Faker(provider).evaluate({}, None, {'locale': locale,})
|
||||
while len(string) < min_length:
|
||||
string += factory.Faker(provider).evaluate({}, None, {'locale': locale,})
|
||||
return string
|
||||
|
||||
|
||||
def get_fake_html(locale: str = "en_US", wrap_in_body_tag=True) -> Any:
|
||||
"""
|
||||
Generates a random string, float, integer etc based on provider
|
||||
Provider can be "text', 'sentence',
|
||||
e.g. `get_fake('name')` ==> 'Buzz Aldrin'
|
||||
"""
|
||||
html = factory.Faker("sentence").evaluate({}, None, {'locale': locale,})
|
||||
for _ in range(0,4):
|
||||
html += "<li>" + factory.Faker("sentence").evaluate({}, None, {'locale': locale,}) + "</li>"
|
||||
for _ in range(0,4):
|
||||
html += "<p>" + factory.Faker("text").evaluate({}, None, {'locale': locale,})
|
||||
return f"<body>{html}</body>" if wrap_in_body_tag else html
|
||||
|
||||
def generate_email_address(
|
||||
locale: str="en_US",
|
||||
use_short_email: bool=False,
|
||||
real_name_format: Optional[str]="{last_name}, {first_name}",
|
||||
last_name_override: Optional[str]=None) -> Tuple[str, str, str, str]:
|
||||
'''
|
||||
Generate an RFC 2822 email address
|
||||
|
||||
:param locale: change this to generate locale specific names
|
||||
:param use_short_email: defaults to false. If true then does not include real name in email address
|
||||
:param real_name_format: pass a different format if different than "{last_name}, {first_name}"
|
||||
:param last_name_override: override the fake name if you want some special characters in the last name
|
||||
:returns <RFC2822 formatted email for header>, <short email address>, <first name>, <last_name
|
||||
'''
|
||||
fake = faker.Faker(locale=locale)
|
||||
first_name = fake.first_name()
|
||||
last_name = last_name_override or fake.last_name()
|
||||
real_name = None if use_short_email else real_name_format.format(first_name=first_name, last_name=last_name)
|
||||
# Add a random string to ensure we do not generate a real domain name
|
||||
email_address = "{}.{}@{}".format(
|
||||
first_name.replace(' ', '').encode("ascii", "ignore").lower().decode(),
|
||||
last_name.replace(' ', '').encode("ascii", "ignore").lower().decode(),
|
||||
get_random_string(5) + fake.domain_name()
|
||||
)
|
||||
# format email address for RFC 2822 and return
|
||||
return email.utils.formataddr((real_name, email_address)), email_address, first_name, last_name
|
||||
|
||||
def generate_file_mime_part(locale: str="en_US",filename: str = None) -> Message:
|
||||
"""
|
||||
|
||||
:param locale: change this to generate locale specific file name and attachment content
|
||||
:param filename: pass a file name if you want to specify a specific name otherwise a random name will be generated
|
||||
"""
|
||||
part = MIMEBase('application', 'octet-stream')
|
||||
part.set_payload(get_fake("text", locale=locale, min_length=1024))
|
||||
encoders.encode_base64(part)
|
||||
if not filename:
|
||||
filename = get_fake("word", locale=locale, min_length=8) + ".txt"
|
||||
part.add_header('Content-Disposition', "attachment; filename= %s" % filename)
|
||||
return part
|
||||
|
||||
def generate_image_mime_part(locale: str="en_US",imagename: str = None) -> Message:
|
||||
"""
|
||||
|
||||
:param locale: change this to generate locale specific file name and attachment content
|
||||
:param filename: pass a file name if you want to specify a specific name otherwise a random name will be generated
|
||||
"""
|
||||
part = MIMEImage(generate_random_image(image_format="JPEG", array_dims=(200, 200)))
|
||||
part.set_payload(get_fake("text", locale=locale, min_length=1024))
|
||||
encoders.encode_base64(part)
|
||||
if not imagename:
|
||||
imagename = get_fake("word", locale=locale, min_length=8) + ".jpg"
|
||||
part.add_header('Content-Disposition', "attachment; filename= %s" % imagename)
|
||||
return part
|
||||
|
||||
def generate_email_list(address_cnt: int = 3,
|
||||
locale: str="en_US",
|
||||
use_short_email: bool=False
|
||||
) -> str:
|
||||
"""
|
||||
Generates a list of email addresses formatted for email headers on a Mime part
|
||||
|
||||
:param address_cnt: the number of email addresses to string together
|
||||
:param locale: change this to generate locale specific "real names" and subject
|
||||
:param use_short_email: produces a email address without "real name" if True
|
||||
|
||||
"""
|
||||
email_address_list = [generate_email_address(locale, use_short_email=use_short_email)[0] for _ in range(0, address_cnt)]
|
||||
return ",".join(email_address_list)
|
||||
|
||||
def add_simple_email_headers(message: Message, locale: str="en_US",
|
||||
use_short_email: bool=False
|
||||
) -> typing.Tuple[typing.Tuple[str, str], typing.Tuple[str, str]]:
|
||||
"""
|
||||
Adds the key email headers to a Mime part
|
||||
|
||||
:param message: the Mime part to add headers to
|
||||
:param locale: change this to generate locale specific "real names" and subject
|
||||
:param use_short_email: produces a "To" or "From" that is only the email address if True
|
||||
|
||||
"""
|
||||
to_meta = generate_email_address(locale, use_short_email=use_short_email)
|
||||
from_meta = generate_email_address(locale, use_short_email=use_short_email)
|
||||
|
||||
message['Subject'] = get_fake("sentence", locale=locale)
|
||||
message['From'] = from_meta[0]
|
||||
message['To'] = to_meta[0]
|
||||
return from_meta, to_meta
|
||||
|
||||
def generate_mime_part(locale: str="en_US",
|
||||
part_type: str="plain",
|
||||
) -> typing.Optional[Message]:
|
||||
"""
|
||||
Generates amime part of the sepecified type
|
||||
|
||||
:param locale: change this to generate locale specific strings
|
||||
:param text_type: options are plain, html, image (attachment), file (attachment)
|
||||
"""
|
||||
if "plain" == part_type:
|
||||
body = get_fake("text", locale=locale, min_length=1024)
|
||||
msg = MIMEText(body)
|
||||
elif "html" == part_type:
|
||||
body = get_fake_html(locale=locale, wrap_in_body_tag=True)
|
||||
msg = MIMEText(body)
|
||||
elif "file" == part_type:
|
||||
msg = generate_file_mime_part(locale=locale)
|
||||
elif "image" == part_type:
|
||||
msg = generate_image_mime_part(locale=locale)
|
||||
else:
|
||||
raise Exception("Mime part not implemented: " + part_type)
|
||||
return msg
|
||||
|
||||
def generate_multipart_email(locale: str="en_US",
|
||||
type_list: typing.List[str]=["plain", "html", "attachment"],
|
||||
use_short_email: bool=False
|
||||
) -> typing.Tuple[Message, typing.Tuple[str, str], typing.Tuple[str, str]]:
|
||||
"""
|
||||
Generates an email including headers with the defined multiparts
|
||||
|
||||
:param locale:
|
||||
:param type_list: options are plain, html, image (attachment), file (attachment)
|
||||
:param use_short_email: produces a "To" or "From" that is only the email address if True
|
||||
"""
|
||||
msg = MIMEMultipart()
|
||||
for part_type in type_list:
|
||||
msg.attach(generate_mime_part(locale=locale, part_type=part_type))
|
||||
from_meta, to_meta = add_simple_email_headers(msg, locale=locale, use_short_email=use_short_email)
|
||||
return msg, from_meta, to_meta
|
||||
|
||||
def generate_text_email(locale: str="en_US",
|
||||
use_short_email: bool=False
|
||||
) -> typing.Tuple[Message, typing.Tuple[str, str], typing.Tuple[str, str]]:
|
||||
"""
|
||||
Generates an email including headers
|
||||
"""
|
||||
body = get_fake("text", locale=locale, min_length=1024)
|
||||
msg = MIMEText(body)
|
||||
from_meta, to_meta = add_simple_email_headers(msg, locale=locale, use_short_email=use_short_email)
|
||||
return msg, from_meta, to_meta
|
||||
|
||||
def generate_html_email(locale: str="en_US",
|
||||
use_short_email: bool=False
|
||||
) -> typing.Tuple[Message, typing.Tuple[str, str], typing.Tuple[str, str]]:
|
||||
"""
|
||||
Generates an email including headers
|
||||
"""
|
||||
body = get_fake_html(locale=locale)
|
||||
msg = MIMEText(body)
|
||||
from_meta, to_meta = add_simple_email_headers(msg, locale=locale, use_short_email=use_short_email)
|
||||
return msg, from_meta, to_meta
|
@ -151,7 +151,7 @@ urlpatterns += [
|
||||
|
||||
urlpatterns += [
|
||||
re_path(
|
||||
r"^rss/user/(?P<user_name>[a-zA-Z0-9\_\.]+)/",
|
||||
r"^rss/user/(?P<user_name>[^/]+)/",
|
||||
helpdesk_staff_member_required(feeds.OpenTicketsByUser()),
|
||||
name="rss_user",
|
||||
),
|
||||
|
@ -4,6 +4,7 @@
|
||||
|
||||
|
||||
from django.conf import settings
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
|
||||
# TODO: can we use the builtin Django validator instead?
|
||||
@ -32,4 +33,5 @@ def validate_file_extension(value):
|
||||
# should always allow that?
|
||||
if not (ext.lower() == '' or ext.lower() == '.'):
|
||||
raise ValidationError(
|
||||
'Unsupported file extension: %s.' % ext.lower())
|
||||
_('Unsupported file extension: ') + ext.lower()
|
||||
)
|
||||
|
@ -24,7 +24,9 @@ from django.http import Http404, HttpResponse, HttpResponseRedirect, JsonRespons
|
||||
from django.shortcuts import get_object_or_404, redirect, render
|
||||
from django.urls import reverse, reverse_lazy
|
||||
from django.utils import timezone
|
||||
from django.utils.dateparse import parse_datetime
|
||||
from django.utils.html import escape
|
||||
from django.utils.timezone import make_aware
|
||||
from django.utils.translation import gettext as _
|
||||
from django.views.decorators.csrf import requires_csrf_token
|
||||
from django.views.generic.edit import FormView, UpdateView
|
||||
@ -519,12 +521,7 @@ def get_due_date_from_request_or_ticket(
|
||||
due_date = request.POST.get('due_date', None) or None
|
||||
|
||||
if due_date is not None:
|
||||
# based on Django code to parse dates:
|
||||
# https://docs.djangoproject.com/en/2.0/_modules/django/utils/dateparse/
|
||||
match = DATE_RE.match(due_date)
|
||||
if match:
|
||||
kw = {k: int(v) for k, v in match.groupdict().items()}
|
||||
due_date = date(**kw)
|
||||
due_date = make_aware(parse_datetime(due_date))
|
||||
else:
|
||||
due_date_year = int(request.POST.get('due_date_year', 0))
|
||||
due_date_month = int(request.POST.get('due_date_month', 0))
|
||||
|
5
requirements-dev.txt
Normal file
5
requirements-dev.txt
Normal file
@ -0,0 +1,5 @@
|
||||
tox
|
||||
autopep8
|
||||
flake8
|
||||
pycodestyle
|
||||
isort
|
@ -7,3 +7,6 @@ pbr
|
||||
mock
|
||||
freezegun
|
||||
isort
|
||||
numpy
|
||||
factory_boy
|
||||
faker
|
Loading…
Reference in New Issue
Block a user