forked from extern/django-helpdesk
948 lines
43 KiB
Python
948 lines
43 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
|
|
from django.contrib.auth.hashers import make_password
|
|
from django.contrib.auth.models import User
|
|
from django.core import mail
|
|
from django.core.files.uploadedfile import SimpleUploadedFile
|
|
from django.core.management import call_command
|
|
from django.shortcuts import get_object_or_404
|
|
from django.test import override_settings, TestCase
|
|
import helpdesk.email
|
|
from helpdesk.email import extract_part_data, object_from_message
|
|
from helpdesk.exceptions import DeleteIgnoredTicketException, IgnoreTicketException
|
|
from helpdesk.management.commands.get_email import Command
|
|
from helpdesk.models import FollowUp, FollowUpAttachment, IgnoreEmail, Queue, Ticket, TicketCC
|
|
from helpdesk.tests import utils
|
|
import itertools
|
|
import logging
|
|
import os
|
|
from shutil import rmtree
|
|
import sys
|
|
from tempfile import mkdtemp
|
|
import typing
|
|
from unittest import mock
|
|
|
|
|
|
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
# class A addresses can't have first octet of 0
|
|
unrouted_socks_server = "0.0.0.1"
|
|
unrouted_email_server = "0.0.0.1"
|
|
# the last user port, reserved by IANA
|
|
unused_port = "49151"
|
|
|
|
|
|
class GetEmailCommonTests(TestCase):
|
|
|
|
def setUp(self):
|
|
self.queue_public = Queue.objects.create(title='Test', slug='test')
|
|
self.logger = logging.getLogger('helpdesk')
|
|
|
|
# tests correct syntax for command line option
|
|
def test_get_email_quiet_option(self):
|
|
"""Test quiet option is properly propagated"""
|
|
# Test get_email with quiet set to True and also False, and verify
|
|
# handle receives quiet option set properly
|
|
for quiet_test_value in [True, False]:
|
|
with mock.patch.object(Command, 'handle', return_value=None) as mocked_handle:
|
|
call_command('get_email', quiet=quiet_test_value)
|
|
mocked_handle.assert_called_once()
|
|
for _, kwargs in mocked_handle.call_args_list:
|
|
self.assertEqual(quiet_test_value, (kwargs['quiet']))
|
|
|
|
def test_email_with_blank_body_and_attachment(self):
|
|
"""
|
|
Tests that emails with blank bodies and attachments work.
|
|
https://github.com/django-helpdesk/django-helpdesk/issues/700
|
|
"""
|
|
with open(os.path.join(THIS_DIR, "test_files/blank-body-with-attachment.eml")) as fd:
|
|
test_email = fd.read()
|
|
ticket = helpdesk.email.object_from_message(
|
|
test_email, self.queue_public, self.logger)
|
|
|
|
# title got truncated because of max_lengh of the model.title field
|
|
assert ticket.title == (
|
|
"Attachment without body - and a loooooooooooooooooooooooooooooooooo"
|
|
"ooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
|
|
"ooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo..."
|
|
)
|
|
self.assertEqual(ticket.description, "")
|
|
|
|
def test_email_with_quoted_printable_body(self):
|
|
"""
|
|
Tests that emails with quoted-printable bodies work.
|
|
"""
|
|
with open(os.path.join(THIS_DIR, "test_files/quoted_printable.eml")) as fd:
|
|
test_email = fd.read()
|
|
ticket = helpdesk.email.object_from_message(
|
|
test_email, self.queue_public, self.logger)
|
|
self.assertEqual(ticket.title, "Český test")
|
|
self.assertEqual(ticket.description,
|
|
"Tohle je test českých písmen odeslaných z gmailu.")
|
|
followups = FollowUp.objects.filter(ticket=ticket)
|
|
self.assertEqual(len(followups), 1)
|
|
followup = followups[0]
|
|
attachments = FollowUpAttachment.objects.filter(followup=followup)
|
|
self.assertEqual(len(attachments), 1)
|
|
attachment = attachments[0]
|
|
self.assertIn('<div dir="ltr">Tohle je test českých písmen odeslaných z gmailu.</div>\n',
|
|
attachment.file.read().decode("utf-8"))
|
|
|
|
def test_email_with_8bit_encoding_and_utf_8(self):
|
|
"""
|
|
Tests that emails with 8bit transfer encoding and utf-8 charset
|
|
https://github.com/django-helpdesk/django-helpdesk/issues/732
|
|
"""
|
|
with open(os.path.join(THIS_DIR, "test_files/all-special-chars.eml")) as fd:
|
|
test_email = fd.read()
|
|
ticket = helpdesk.email.object_from_message(
|
|
test_email, self.queue_public, self.logger)
|
|
self.assertEqual(ticket.title, "Testovácí email")
|
|
self.assertEqual(ticket.description, "íářčšáíéřášč")
|
|
|
|
@override_settings(HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL=True)
|
|
def test_email_with_utf_8_non_decodable_sequences(self):
|
|
"""
|
|
Tests that emails with utf-8 non-decodable sequences are parsed correctly
|
|
The message is fowarded as well
|
|
"""
|
|
with open(os.path.join(THIS_DIR, "test_files/utf-nondecodable.eml")) as fd:
|
|
test_email = fd.read()
|
|
ticket = helpdesk.email.object_from_message(
|
|
test_email, self.queue_public, self.logger)
|
|
self.assertEqual(
|
|
ticket.title, "Fwd: Cyklozaměstnavatel - změna vyhodnocení")
|
|
self.assertIn("prosazuje lepší", ticket.description)
|
|
followups = FollowUp.objects.filter(ticket=ticket)
|
|
followup = followups[0]
|
|
attachments = FollowUpAttachment.objects.filter(followup=followup)
|
|
attachment = attachments[0]
|
|
self.assertIn('prosazuje lepší',
|
|
attachment.file.read().decode("utf-8"))
|
|
|
|
@override_settings(HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL=True)
|
|
def test_email_with_forwarded_message(self):
|
|
"""
|
|
Forwarded message of that format must be still attached correctly
|
|
"""
|
|
with open(os.path.join(THIS_DIR, "test_files/forwarded-message.eml")) as fd:
|
|
test_email = fd.read()
|
|
ticket = helpdesk.email.object_from_message(
|
|
test_email, self.queue_public, self.logger)
|
|
self.assertEqual(
|
|
ticket.title, "Test with original message from GitHub")
|
|
self.assertIn("This is email body", ticket.description)
|
|
assert "Hello there!" not in ticket.description, ticket.description
|
|
assert FollowUp.objects.filter(ticket=ticket).count() == 1
|
|
assert "Hello there!" in FollowUp.objects.filter(
|
|
ticket=ticket).first().comment
|
|
|
|
def test_will_delete_ignored_email(self):
|
|
"""
|
|
Tests if an email will be ignored if configured to do so and throws the correct exception
|
|
to ensure the email is deleted
|
|
"""
|
|
message, from_meta, _ = utils.generate_text_email(locale="es_ES")
|
|
ignore = IgnoreEmail(name="Test Ignore", email_address=from_meta[1], keep_in_mailbox=False)
|
|
ignore.save()
|
|
with self.assertRaises(DeleteIgnoredTicketException):
|
|
object_from_message(message.as_string(), self.queue_public, self.logger)
|
|
|
|
def test_will_not_delete_ignored_email(self):
|
|
"""
|
|
Tests if an email will be ignored if configured to do so and throws the correct exception
|
|
to ensure the email is NOT deleted
|
|
"""
|
|
message, from_meta, _ = utils.generate_text_email(locale="es_ES")
|
|
ignore = IgnoreEmail(name="Test Ignore", email_address=from_meta[1], keep_in_mailbox=True)
|
|
ignore.save()
|
|
with self.assertRaises(IgnoreTicketException):
|
|
object_from_message(message.as_string(), self.queue_public, self.logger)
|
|
|
|
def test_utf8_filename_attachment(self):
|
|
"""
|
|
Tests if an attachment correctly sent with a UTF8 filename in disposition is extracted correctly
|
|
"""
|
|
filename = "TeléfonoMañana.txt"
|
|
part = utils.generate_file_mime_part(locale="es_ES", filename=filename)
|
|
files: typing.List[SimpleUploadedFile] = []
|
|
extract_part_data(part, counter=1, ticket_id="tst1", files=files, logger=self.logger)
|
|
sent_file: SimpleUploadedFile = files[0]
|
|
# The extractor prepends a part identifier so compare the ending
|
|
self.assertTrue(sent_file.name.endswith(filename), f"Filename extracted does not match: {sent_file.name}")
|
|
|
|
@override_settings(VALID_EXTENSIONS=['.png'])
|
|
def test_wrong_extension_attachment(self):
|
|
"""
|
|
Tests if an attachment with a wrong extension doesn't stop the email process
|
|
"""
|
|
message, _, _ = utils.generate_multipart_email(type_list=['plain', 'image'])
|
|
|
|
self.assertEqual(len(mail.outbox), 0)
|
|
|
|
with self.assertLogs(logger='helpdesk', level='ERROR') as cm:
|
|
object_from_message(message.as_string(), self.queue_public, self.logger)
|
|
|
|
self.assertIn(
|
|
"ERROR:helpdesk:['Unsupported file extension: .jpg']",
|
|
cm.output
|
|
)
|
|
|
|
self.assertEqual(len(mail.outbox), 1)
|
|
self.assertEqual(f'[test-1] {message.get("subject")} (Opened)', mail.outbox[0].subject)
|
|
|
|
def test_multiple_attachments(self):
|
|
"""
|
|
Tests the saving of multiple attachments
|
|
"""
|
|
message, _, _ = utils.generate_multipart_email(type_list=['plain', 'file', 'image'])
|
|
|
|
self.assertEqual(len(mail.outbox), 0)
|
|
|
|
object_from_message(message.as_string(), self.queue_public, self.logger)
|
|
|
|
self.assertEqual(len(mail.outbox), 1)
|
|
self.assertEqual(f'[test-1] {message.get("subject")} (Opened)', mail.outbox[0].subject)
|
|
|
|
ticket = Ticket.objects.get()
|
|
followup = ticket.followup_set.get()
|
|
self.assertEqual(2, followup.followupattachment_set.count())
|
|
|
|
@override_settings(VALID_EXTENSIONS=['.txt'])
|
|
def test_multiple_attachments_with_wrong_extension(self):
|
|
"""
|
|
Tests that a wrong extension won't stop from saving other valid attachment
|
|
"""
|
|
message, _, _ = utils.generate_multipart_email(type_list=['plain', 'image', 'file', 'image'])
|
|
|
|
self.assertEqual(len(mail.outbox), 0)
|
|
|
|
with self.assertLogs(logger='helpdesk', level='ERROR') as cm:
|
|
object_from_message(message.as_string(), self.queue_public, self.logger)
|
|
|
|
self.assertIn(
|
|
"ERROR:helpdesk:['Unsupported file extension: .jpg']",
|
|
cm.output
|
|
)
|
|
|
|
ticket = Ticket.objects.get()
|
|
followup = ticket.followup_set.get()
|
|
self.assertEqual(1, followup.followupattachment_set.count())
|
|
|
|
|
|
class GetEmailParametricTemplate(object):
|
|
"""TestCase that checks basic email functionality across methods and socks configs."""
|
|
|
|
def setUp(self):
|
|
|
|
self.temp_logdir = mkdtemp()
|
|
kwargs = {
|
|
"title": 'Basic Queue',
|
|
"slug": 'QQ',
|
|
"allow_public_submission": True,
|
|
"allow_email_submission": True,
|
|
"email_box_type": self.method,
|
|
"logging_dir": self.temp_logdir,
|
|
"logging_type": 'none'
|
|
}
|
|
|
|
if self.method == 'local':
|
|
kwargs["email_box_local_dir"] = '/var/lib/mail/helpdesk/'
|
|
else:
|
|
kwargs["email_box_host"] = unrouted_email_server
|
|
kwargs["email_box_port"] = unused_port
|
|
|
|
if self.socks:
|
|
kwargs["socks_proxy_type"] = self.socks
|
|
kwargs["socks_proxy_host"] = unrouted_socks_server
|
|
kwargs["socks_proxy_port"] = unused_port
|
|
|
|
self.queue_public = Queue.objects.create(**kwargs)
|
|
|
|
def tearDown(self):
|
|
|
|
rmtree(self.temp_logdir)
|
|
|
|
def test_read_plain_email(self):
|
|
"""Tests reading plain text emails from a queue and creating tickets.
|
|
For each email source supported, we mock the backend to provide
|
|
authentically formatted responses containing our test data."""
|
|
|
|
# example email text from Django docs:
|
|
# https://docs.djangoproject.com/en/1.10/ref/unicode/
|
|
test_email_from = "Arnbjörg Ráðormsdóttir <arnbjorg@example.com>"
|
|
test_email_subject = "My visit to Sør-Trøndelag"
|
|
test_email_body = "Unicode helpdesk comment with an s-hat (ŝ) via email."
|
|
test_email = "To: helpdesk@example.com\nFrom: " + test_email_from + \
|
|
"\nSubject: " + test_email_subject + "\n\n" + test_email_body
|
|
test_mail_len = len(test_email)
|
|
|
|
if self.socks:
|
|
from socks import ProxyConnectionError
|
|
with self.assertRaisesRegex(ProxyConnectionError, '%s:%s' % (unrouted_socks_server, unused_port)):
|
|
call_command('get_email')
|
|
|
|
else:
|
|
# Test local email reading
|
|
if self.method == 'local':
|
|
with mock.patch('os.listdir') as mocked_listdir, \
|
|
mock.patch('helpdesk.email.isfile') as mocked_isfile, \
|
|
mock.patch('builtins.open', mock.mock_open(read_data=test_email)), \
|
|
mock.patch('os.unlink'):
|
|
mocked_isfile.return_value = True
|
|
mocked_listdir.return_value = ['filename1', 'filename2']
|
|
|
|
call_command('get_email')
|
|
|
|
mocked_listdir.assert_called_with(
|
|
'/var/lib/mail/helpdesk/')
|
|
mocked_isfile.assert_any_call(
|
|
'/var/lib/mail/helpdesk/filename1')
|
|
mocked_isfile.assert_any_call(
|
|
'/var/lib/mail/helpdesk/filename2')
|
|
|
|
elif self.method == 'pop3':
|
|
# mock poplib.POP3's list and retr methods to provide responses
|
|
# as per RFC 1939
|
|
pop3_emails = {
|
|
'1': ("+OK", test_email.split('\n')),
|
|
'2': ("+OK", test_email.split('\n')),
|
|
}
|
|
pop3_mail_list = ("+OK 2 messages", ("1 %d" %
|
|
test_mail_len, "2 %d" % test_mail_len))
|
|
mocked_poplib_server = mock.Mock()
|
|
mocked_poplib_server.list = mock.Mock(
|
|
return_value=pop3_mail_list)
|
|
mocked_poplib_server.retr = mock.Mock(
|
|
side_effect=lambda x: pop3_emails[x])
|
|
with mock.patch('helpdesk.email.poplib', autospec=True) as mocked_poplib:
|
|
mocked_poplib.POP3 = mock.Mock(
|
|
return_value=mocked_poplib_server)
|
|
call_command('get_email')
|
|
|
|
elif self.method == 'imap':
|
|
# mock imaplib.IMAP4's search and fetch methods with responses
|
|
# from RFC 3501
|
|
imap_emails = {
|
|
"1": ("OK", (("1", test_email),)),
|
|
"2": ("OK", (("2", test_email),)),
|
|
}
|
|
imap_mail_list = ("OK", ("1 2",))
|
|
mocked_imaplib_server = mock.Mock()
|
|
mocked_imaplib_server.search = mock.Mock(
|
|
return_value=imap_mail_list)
|
|
|
|
# we ignore the second arg as the data item/mime-part is
|
|
# constant (RFC822)
|
|
mocked_imaplib_server.fetch = mock.Mock(
|
|
side_effect=lambda x, _: imap_emails[x])
|
|
with mock.patch('helpdesk.email.imaplib', autospec=True) as mocked_imaplib:
|
|
mocked_imaplib.IMAP4 = mock.Mock(
|
|
return_value=mocked_imaplib_server)
|
|
call_command('get_email')
|
|
|
|
ticket1 = get_object_or_404(Ticket, pk=1)
|
|
self.assertEqual(ticket1.ticket_for_url, "QQ-%s" % ticket1.id)
|
|
self.assertEqual(ticket1.title, test_email_subject)
|
|
self.assertEqual(ticket1.description, test_email_body)
|
|
|
|
ticket2 = get_object_or_404(Ticket, pk=2)
|
|
self.assertEqual(ticket2.ticket_for_url, "QQ-%s" % ticket2.id)
|
|
self.assertEqual(ticket2.title, test_email_subject)
|
|
self.assertEqual(ticket2.description, test_email_body)
|
|
|
|
def test_commas_in_mail_headers(self):
|
|
"""Tests correctly decoding mail headers when a comma is encoded into
|
|
UTF-8. See bug report #832."""
|
|
|
|
# Create the from using standard RFC required formats
|
|
# Override the last_name to ensure we get a non-ascii character in it
|
|
test_email_from_meta = utils.generate_email_address("fr_FR", last_name_override="Bouissières")
|
|
test_email_subject = "Commas in From lines"
|
|
test_email_body = "Testing commas in from email UTF-8."
|
|
test_email = "To: helpdesk@example.com\nFrom: " + test_email_from_meta[0] + \
|
|
"\nSubject: " + test_email_subject + "\n\n" + test_email_body
|
|
test_mail_len = len(test_email)
|
|
|
|
if self.socks:
|
|
from socks import ProxyConnectionError
|
|
with self.assertRaisesRegex(ProxyConnectionError, '%s:%s' % (unrouted_socks_server, unused_port)):
|
|
call_command('get_email')
|
|
|
|
else:
|
|
# Test local email reading
|
|
if self.method == 'local':
|
|
with mock.patch('os.listdir') as mocked_listdir, \
|
|
mock.patch('helpdesk.email.isfile') as mocked_isfile, \
|
|
mock.patch('builtins.open', mock.mock_open(read_data=test_email)), \
|
|
mock.patch('os.unlink'):
|
|
mocked_isfile.return_value = True
|
|
mocked_listdir.return_value = ['filename1', 'filename2']
|
|
|
|
call_command('get_email')
|
|
|
|
mocked_listdir.assert_called_with(
|
|
'/var/lib/mail/helpdesk/')
|
|
mocked_isfile.assert_any_call(
|
|
'/var/lib/mail/helpdesk/filename1')
|
|
mocked_isfile.assert_any_call(
|
|
'/var/lib/mail/helpdesk/filename2')
|
|
|
|
elif self.method == 'pop3':
|
|
# mock poplib.POP3's list and retr methods to provide responses
|
|
# as per RFC 1939
|
|
pop3_emails = {
|
|
'1': ("+OK", test_email.split('\n')),
|
|
'2': ("+OK", test_email.split('\n')),
|
|
}
|
|
pop3_mail_list = ("+OK 2 messages", ("1 %d" %
|
|
test_mail_len, "2 %d" % test_mail_len))
|
|
mocked_poplib_server = mock.Mock()
|
|
mocked_poplib_server.list = mock.Mock(
|
|
return_value=pop3_mail_list)
|
|
mocked_poplib_server.retr = mock.Mock(
|
|
side_effect=lambda x: pop3_emails[x])
|
|
with mock.patch('helpdesk.email.poplib', autospec=True) as mocked_poplib:
|
|
mocked_poplib.POP3 = mock.Mock(
|
|
return_value=mocked_poplib_server)
|
|
call_command('get_email')
|
|
|
|
elif self.method == 'imap':
|
|
# mock imaplib.IMAP4's search and fetch methods with responses
|
|
# from RFC 3501
|
|
imap_emails = {
|
|
"1": ("OK", (("1", test_email),)),
|
|
"2": ("OK", (("2", test_email),)),
|
|
}
|
|
imap_mail_list = ("OK", ("1 2",))
|
|
mocked_imaplib_server = mock.Mock()
|
|
mocked_imaplib_server.search = mock.Mock(
|
|
return_value=imap_mail_list)
|
|
|
|
# we ignore the second arg as the data item/mime-part is
|
|
# constant (RFC822)
|
|
mocked_imaplib_server.fetch = mock.Mock(
|
|
side_effect=lambda x, _: imap_emails[x])
|
|
with mock.patch('helpdesk.email.imaplib', autospec=True) as mocked_imaplib:
|
|
mocked_imaplib.IMAP4 = mock.Mock(
|
|
return_value=mocked_imaplib_server)
|
|
call_command('get_email')
|
|
|
|
ticket1 = get_object_or_404(Ticket, pk=1)
|
|
self.assertEqual(ticket1.ticket_for_url, "QQ-%s" % ticket1.id)
|
|
self.assertEqual(ticket1.submitter_email, test_email_from_meta[1])
|
|
self.assertEqual(ticket1.title, test_email_subject)
|
|
self.assertEqual(ticket1.description, test_email_body)
|
|
|
|
ticket2 = get_object_or_404(Ticket, pk=2)
|
|
self.assertEqual(ticket2.ticket_for_url, "QQ-%s" % ticket2.id)
|
|
self.assertEqual(ticket2.submitter_email, test_email_from_meta[1])
|
|
self.assertEqual(ticket2.title, test_email_subject)
|
|
self.assertEqual(ticket2.description, test_email_body)
|
|
|
|
def test_read_email_with_template_tag(self):
|
|
"""Tests reading plain text emails from a queue and creating tickets,
|
|
except this time the email body contains a Django template tag.
|
|
For each email source supported, we mock the backend to provide
|
|
authentically formatted responses containing our test data."""
|
|
|
|
# example email text from Django docs:
|
|
# https://docs.djangoproject.com/en/1.10/ref/unicode/
|
|
test_email_from = "Arnbjörg Ráðormsdóttir <arnbjorg@example.com>"
|
|
test_email_subject = "My visit to Sør-Trøndelag"
|
|
test_email_body = "Reporting some issue with the template tag: {% if helpdesk %}."
|
|
test_email = "To: helpdesk@example.com\nFrom: " + test_email_from + \
|
|
"\nSubject: " + test_email_subject + "\n\n" + test_email_body
|
|
test_mail_len = len(test_email)
|
|
|
|
if self.socks:
|
|
from socks import ProxyConnectionError
|
|
with self.assertRaisesRegex(ProxyConnectionError, '%s:%s' % (unrouted_socks_server, unused_port)):
|
|
call_command('get_email')
|
|
|
|
else:
|
|
# Test local email reading
|
|
if self.method == 'local':
|
|
with mock.patch('os.listdir') as mocked_listdir, \
|
|
mock.patch('helpdesk.email.isfile') as mocked_isfile, \
|
|
mock.patch('builtins.open', mock.mock_open(read_data=test_email)), \
|
|
mock.patch('os.unlink'):
|
|
mocked_isfile.return_value = True
|
|
mocked_listdir.return_value = ['filename1', 'filename2']
|
|
|
|
call_command('get_email')
|
|
|
|
mocked_listdir.assert_called_with(
|
|
'/var/lib/mail/helpdesk/')
|
|
mocked_isfile.assert_any_call(
|
|
'/var/lib/mail/helpdesk/filename1')
|
|
mocked_isfile.assert_any_call(
|
|
'/var/lib/mail/helpdesk/filename2')
|
|
|
|
elif self.method == 'pop3':
|
|
# mock poplib.POP3's list and retr methods to provide responses
|
|
# as per RFC 1939
|
|
pop3_emails = {
|
|
'1': ("+OK", test_email.split('\n')),
|
|
'2': ("+OK", test_email.split('\n')),
|
|
}
|
|
pop3_mail_list = ("+OK 2 messages", ("1 %d" %
|
|
test_mail_len, "2 %d" % test_mail_len))
|
|
mocked_poplib_server = mock.Mock()
|
|
mocked_poplib_server.list = mock.Mock(
|
|
return_value=pop3_mail_list)
|
|
mocked_poplib_server.retr = mock.Mock(
|
|
side_effect=lambda x: pop3_emails[x])
|
|
with mock.patch('helpdesk.email.poplib', autospec=True) as mocked_poplib:
|
|
mocked_poplib.POP3 = mock.Mock(
|
|
return_value=mocked_poplib_server)
|
|
call_command('get_email')
|
|
|
|
elif self.method == 'imap':
|
|
# mock imaplib.IMAP4's search and fetch methods with responses
|
|
# from RFC 3501
|
|
imap_emails = {
|
|
"1": ("OK", (("1", test_email),)),
|
|
"2": ("OK", (("2", test_email),)),
|
|
}
|
|
imap_mail_list = ("OK", ("1 2",))
|
|
mocked_imaplib_server = mock.Mock()
|
|
mocked_imaplib_server.search = mock.Mock(
|
|
return_value=imap_mail_list)
|
|
|
|
# we ignore the second arg as the data item/mime-part is
|
|
# constant (RFC822)
|
|
mocked_imaplib_server.fetch = mock.Mock(
|
|
side_effect=lambda x, _: imap_emails[x])
|
|
with mock.patch('helpdesk.email.imaplib', autospec=True) as mocked_imaplib:
|
|
mocked_imaplib.IMAP4 = mock.Mock(
|
|
return_value=mocked_imaplib_server)
|
|
call_command('get_email')
|
|
|
|
ticket1 = get_object_or_404(Ticket, pk=1)
|
|
self.assertEqual(ticket1.ticket_for_url, "QQ-%s" % ticket1.id)
|
|
self.assertEqual(ticket1.title, test_email_subject)
|
|
self.assertEqual(ticket1.description, test_email_body)
|
|
|
|
ticket2 = get_object_or_404(Ticket, pk=2)
|
|
self.assertEqual(ticket2.ticket_for_url, "QQ-%s" % ticket2.id)
|
|
self.assertEqual(ticket2.title, test_email_subject)
|
|
self.assertEqual(ticket2.description, test_email_body)
|
|
|
|
def test_read_html_multipart_email(self):
|
|
"""Tests reading multipart MIME (HTML body and plain text alternative)
|
|
emails from a queue and creating tickets.
|
|
For each email source supported, we mock the backend to provide
|
|
authentically formatted responses containing our test data."""
|
|
|
|
# example email text from Python docs:
|
|
# https://docs.python.org/3/library/email-examples.html
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
|
|
me = "my@example.com"
|
|
you = "your@example.com"
|
|
# NOTE: CC'd emails need to be alphabetical and tested as such!
|
|
# implementation uses sets, so only way to ensure tickets created
|
|
# in right order is to change set to list and sort it
|
|
cc_one = "nobody@example.com"
|
|
cc_two = "other@example.com"
|
|
cc = cc_one + ", " + cc_two
|
|
subject = "Link"
|
|
|
|
# Create message container - the correct MIME type is
|
|
# multipart/alternative.
|
|
msg = MIMEMultipart('alternative')
|
|
msg['Subject'] = subject
|
|
msg['From'] = me
|
|
msg['To'] = you
|
|
msg['Cc'] = cc
|
|
|
|
# Create the body of the message (a plain-text and an HTML version).
|
|
text = "Hi!\nHow are you?\nHere is the link you wanted:\nhttps://www.python.org"
|
|
html = """\
|
|
<html>
|
|
<head></head>
|
|
<body>
|
|
<p>Hi!<br>
|
|
How are you?<br>
|
|
Here is the <a href="https://www.python.org">link</a> you wanted.
|
|
</p>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
# Record the MIME types of both parts - text/plain and text/html.
|
|
part1 = MIMEText(text, 'plain')
|
|
part2 = MIMEText(html, 'html')
|
|
|
|
# Attach parts into message container.
|
|
# According to RFC 2046, the last part of a multipart message, in this case
|
|
# the HTML message, is best and preferred.
|
|
msg.attach(part1)
|
|
msg.attach(part2)
|
|
|
|
test_mail_len = len(msg)
|
|
|
|
if self.socks:
|
|
from socks import ProxyConnectionError
|
|
with self.assertRaisesRegex(ProxyConnectionError, '%s:%s' % (unrouted_socks_server, unused_port)):
|
|
call_command('get_email')
|
|
|
|
else:
|
|
# Test local email reading
|
|
if self.method == 'local':
|
|
with mock.patch('os.listdir') as mocked_listdir, \
|
|
mock.patch('helpdesk.email.isfile') as mocked_isfile, \
|
|
mock.patch('builtins.open', mock.mock_open(read_data=msg.as_string())), \
|
|
mock.patch('os.unlink'):
|
|
mocked_isfile.return_value = True
|
|
mocked_listdir.return_value = ['filename1', 'filename2']
|
|
|
|
call_command('get_email')
|
|
|
|
mocked_listdir.assert_called_with(
|
|
'/var/lib/mail/helpdesk/')
|
|
mocked_isfile.assert_any_call(
|
|
'/var/lib/mail/helpdesk/filename1')
|
|
mocked_isfile.assert_any_call(
|
|
'/var/lib/mail/helpdesk/filename2')
|
|
|
|
elif self.method == 'pop3':
|
|
# mock poplib.POP3's list and retr methods to provide responses
|
|
# as per RFC 1939
|
|
pop3_emails = {
|
|
'1': ("+OK", msg.as_string().split('\n')),
|
|
'2': ("+OK", msg.as_string().split('\n')),
|
|
}
|
|
pop3_mail_list = ("+OK 2 messages", ("1 %d" %
|
|
test_mail_len, "2 %d" % test_mail_len))
|
|
mocked_poplib_server = mock.Mock()
|
|
mocked_poplib_server.list = mock.Mock(
|
|
return_value=pop3_mail_list)
|
|
mocked_poplib_server.retr = mock.Mock(
|
|
side_effect=lambda x: pop3_emails[x])
|
|
with mock.patch('helpdesk.email.poplib', autospec=True) as mocked_poplib:
|
|
mocked_poplib.POP3 = mock.Mock(
|
|
return_value=mocked_poplib_server)
|
|
call_command('get_email')
|
|
|
|
elif self.method == 'imap':
|
|
# mock imaplib.IMAP4's search and fetch methods with responses
|
|
# from RFC 3501
|
|
imap_emails = {
|
|
"1": ("OK", (("1", msg.as_string()),)),
|
|
"2": ("OK", (("2", msg.as_string()),)),
|
|
}
|
|
imap_mail_list = ("OK", ("1 2",))
|
|
mocked_imaplib_server = mock.Mock()
|
|
mocked_imaplib_server.search = mock.Mock(
|
|
return_value=imap_mail_list)
|
|
|
|
# we ignore the second arg as the data item/mime-part is
|
|
# constant (RFC822)
|
|
mocked_imaplib_server.fetch = mock.Mock(
|
|
side_effect=lambda x, _: imap_emails[x])
|
|
with mock.patch('helpdesk.email.imaplib', autospec=True) as mocked_imaplib:
|
|
mocked_imaplib.IMAP4 = mock.Mock(
|
|
return_value=mocked_imaplib_server)
|
|
call_command('get_email')
|
|
|
|
ticket1 = get_object_or_404(Ticket, pk=1)
|
|
self.assertEqual(ticket1.ticket_for_url, "QQ-%s" % ticket1.id)
|
|
self.assertEqual(ticket1.title, subject)
|
|
# plain text should become description
|
|
self.assertEqual(ticket1.description, text)
|
|
# HTML MIME part should be attached to follow up
|
|
followup1 = get_object_or_404(FollowUp, pk=1)
|
|
self.assertEqual(followup1.ticket.id, 1)
|
|
attach1 = get_object_or_404(FollowUpAttachment, pk=1)
|
|
self.assertEqual(attach1.followup.id, 1)
|
|
self.assertEqual(attach1.filename, 'email_html_body.html')
|
|
cc0 = get_object_or_404(TicketCC, pk=1)
|
|
self.assertEqual(cc0.email, you)
|
|
cc1 = get_object_or_404(TicketCC, pk=2)
|
|
self.assertEqual(cc1.email, cc_one)
|
|
cc2 = get_object_or_404(TicketCC, pk=3)
|
|
self.assertEqual(cc2.email, cc_two)
|
|
self.assertEqual(len(TicketCC.objects.filter(ticket=1)), 3)
|
|
|
|
ticket2 = get_object_or_404(Ticket, pk=2)
|
|
self.assertEqual(ticket2.ticket_for_url, "QQ-%s" % ticket2.id)
|
|
self.assertEqual(ticket2.title, subject)
|
|
# plain text should become description
|
|
self.assertEqual(ticket2.description, text)
|
|
# HTML MIME part should be attached to follow up
|
|
followup2 = get_object_or_404(FollowUp, pk=2)
|
|
self.assertEqual(followup2.ticket.id, 2)
|
|
attach2 = get_object_or_404(FollowUpAttachment, pk=2)
|
|
self.assertEqual(attach2.followup.id, 2)
|
|
self.assertEqual(attach2.filename, 'email_html_body.html')
|
|
|
|
def test_read_pgp_signed_email(self):
|
|
"""Tests reading a PGP signed email to ensure we handle base64
|
|
and PGP signatures appropriately."""
|
|
|
|
# example email text from #567 on GitHub
|
|
with open(os.path.join(THIS_DIR, "test_files/pgp.eml")) as fd:
|
|
test_email = fd.read()
|
|
test_mail_len = len(test_email)
|
|
|
|
if self.socks:
|
|
from socks import ProxyConnectionError
|
|
with self.assertRaisesRegex(ProxyConnectionError, '%s:%s' % (unrouted_socks_server, unused_port)):
|
|
call_command('get_email')
|
|
|
|
else:
|
|
# Test local email reading
|
|
if self.method == 'local':
|
|
with mock.patch('os.listdir') as mocked_listdir, \
|
|
mock.patch('helpdesk.email.isfile') as mocked_isfile, \
|
|
mock.patch('builtins.open', mock.mock_open(read_data=test_email)), \
|
|
mock.patch('os.unlink'):
|
|
mocked_isfile.return_value = True
|
|
mocked_listdir.return_value = ['filename1']
|
|
|
|
call_command('get_email')
|
|
|
|
mocked_listdir.assert_called_with(
|
|
'/var/lib/mail/helpdesk/')
|
|
mocked_isfile.assert_any_call(
|
|
'/var/lib/mail/helpdesk/filename1')
|
|
|
|
elif self.method == 'pop3':
|
|
# mock poplib.POP3's list and retr methods to provide responses
|
|
# as per RFC 1939
|
|
pop3_emails = {
|
|
'1': ("+OK", test_email.split('\n')),
|
|
}
|
|
pop3_mail_list = ("+OK 1 message", ("1 %d" % test_mail_len))
|
|
mocked_poplib_server = mock.Mock()
|
|
mocked_poplib_server.list = mock.Mock(
|
|
return_value=pop3_mail_list)
|
|
mocked_poplib_server.retr = mock.Mock(
|
|
side_effect=lambda x: pop3_emails['1'])
|
|
with mock.patch('helpdesk.email.poplib', autospec=True) as mocked_poplib:
|
|
mocked_poplib.POP3 = mock.Mock(
|
|
return_value=mocked_poplib_server)
|
|
call_command('get_email')
|
|
|
|
elif self.method == 'imap':
|
|
# mock imaplib.IMAP4's search and fetch methods with responses
|
|
# from RFC 3501
|
|
imap_emails = {
|
|
"1": ("OK", (("1", test_email),)),
|
|
}
|
|
imap_mail_list = ("OK", ("1",))
|
|
mocked_imaplib_server = mock.Mock()
|
|
mocked_imaplib_server.search = mock.Mock(
|
|
return_value=imap_mail_list)
|
|
|
|
# we ignore the second arg as the data item/mime-part is
|
|
# constant (RFC822)
|
|
mocked_imaplib_server.fetch = mock.Mock(
|
|
side_effect=lambda x, _: imap_emails[x])
|
|
with mock.patch('helpdesk.email.imaplib', autospec=True) as mocked_imaplib:
|
|
mocked_imaplib.IMAP4 = mock.Mock(
|
|
return_value=mocked_imaplib_server)
|
|
call_command('get_email')
|
|
|
|
ticket1 = get_object_or_404(Ticket, pk=1)
|
|
self.assertEqual(ticket1.ticket_for_url, "QQ-%s" % ticket1.id)
|
|
self.assertEqual(
|
|
ticket1.title, "example email that crashes django-helpdesk get_email")
|
|
self.assertEqual(
|
|
ticket1.description, """hi, thanks for looking into this :)\n\nhttps://github.com/django-helpdesk/django-helpdesk/issues/567#issuecomment-342954233""")
|
|
# MIME part should be attached to follow up
|
|
followup1 = get_object_or_404(FollowUp, pk=1)
|
|
self.assertEqual(followup1.ticket.id, 1)
|
|
attach1 = get_object_or_404(FollowUpAttachment, pk=1)
|
|
self.assertEqual(attach1.followup.id, 1)
|
|
self.assertEqual(attach1.filename, 'part-1_signature.asc')
|
|
self.assertEqual(attach1.file.read(), b"""-----BEGIN PGP SIGNATURE-----
|
|
|
|
iQIcBAEBCAAGBQJaA3dnAAoJELBLc7QPITnLN54P/3Zsu7+AIQWDFTvziJfCqswG
|
|
u99fG+iWa6ER+iuZG0YU1BdIxIjSKt1pvqB0yXITlT9FCdf1zc0pmeJ08I0a5pVa
|
|
iaym5prVUro5BNQ6Vqoo0jvOCKNrACtFNv85zDzXbPNP8TrUss41U+ackPHkOHov
|
|
cmJ5YZFQebYXXpibFSIDimVGfwI57vyTWvolttZFLSI1mgGX7MvHaKh253QLdXIo
|
|
EUih40rOw3f/nYPEKyW8QA72ImBsZdcZI5buiiCC1bgMkKSFSNAFiIanYEpGNMnO
|
|
3zYKBpbpBhnWSi5orwx47/v4/Yb/qVr5ppuV23+YoMfEGT8cHPTAdYpnpE27ByAv
|
|
jvpxKEwmkUzD1WxOmQdCcPJPyWz1OBUVvjj0nn0Espnz8V8esl9+IFs739lpFBHu
|
|
fWWA315LTmIJMGH5Ujf4myiQeXDo6Gsy6WhE13q7MKTq3tnyi5dJG9GJCBf646dL
|
|
RwcDf9O7MvKSV2kSPmryLnUF7D+2fva+Cy+CvJDVJCo5zr4ucXPXZ4htpI6Pjpd5
|
|
oPHvbqxSCMJrQ7eAFTYmBNGauSyr0XvGM1qmHBZD/laQEJHYgLT2ILrymZhVDHtK
|
|
W7tXhGjMoUvqAxiKkmG3UHFqN4k3EYo13PwoOWyJHD1M9ArbX/Sk9l8DDguCh3DW
|
|
a9eiiQ+3V1v+7wWHXCzq
|
|
=6JeP
|
|
-----END PGP SIGNATURE-----
|
|
""")
|
|
# should this be 'application/pgp-signature'?
|
|
# self.assertEqual(attach1.mime_type, 'text/plain')
|
|
|
|
|
|
class GetEmailCCHandling(TestCase):
|
|
"""TestCase that checks CC handling in email. Needs its own test harness."""
|
|
|
|
def setUp(self):
|
|
self.temp_logdir = mkdtemp()
|
|
|
|
kwargs = {
|
|
"title": 'CC Queue',
|
|
"slug": 'CC',
|
|
"allow_public_submission": True,
|
|
"allow_email_submission": True,
|
|
"email_address": 'queue@example.com',
|
|
"email_box_type": 'local',
|
|
"email_box_local_dir": '/var/lib/mail/helpdesk/',
|
|
"logging_dir": self.temp_logdir,
|
|
"logging_type": 'none'
|
|
}
|
|
self.queue_public = Queue.objects.create(**kwargs)
|
|
|
|
user1_kwargs = {
|
|
'username': 'staff',
|
|
'email': 'staff@example.com',
|
|
'password': make_password('Test1234'),
|
|
'is_staff': True,
|
|
'is_superuser': False,
|
|
'is_active': True
|
|
}
|
|
self.staff_user = User.objects.create(**user1_kwargs)
|
|
|
|
user2_kwargs = {
|
|
'username': 'assigned',
|
|
'email': 'assigned@example.com',
|
|
'password': make_password('Test1234'),
|
|
'is_staff': True,
|
|
'is_superuser': False,
|
|
'is_active': True
|
|
}
|
|
self.assigned_user = User.objects.create(**user2_kwargs)
|
|
|
|
user3_kwargs = {
|
|
'username': 'observer',
|
|
'email': 'observer@example.com',
|
|
'password': make_password('Test1234'),
|
|
'is_staff': True,
|
|
'is_superuser': False,
|
|
'is_active': True
|
|
}
|
|
self.observer_user = User.objects.create(**user3_kwargs)
|
|
|
|
ticket_kwargs = {
|
|
'title': 'Original Ticket',
|
|
'queue': self.queue_public,
|
|
'submitter_email': 'submitter@example.com',
|
|
'assigned_to': self.assigned_user,
|
|
'status': 1
|
|
}
|
|
self.original_ticket = Ticket.objects.create(**ticket_kwargs)
|
|
|
|
cc_kwargs = {
|
|
'ticket': self.original_ticket,
|
|
'user': self.staff_user,
|
|
'can_view': True,
|
|
'can_update': True
|
|
}
|
|
self.original_cc = TicketCC.objects.create(**cc_kwargs)
|
|
|
|
def tearDown(self):
|
|
rmtree(self.temp_logdir)
|
|
|
|
def test_read_email_cc(self):
|
|
"""Tests reading plain text emails from a queue and adding to a ticket,
|
|
particularly to test appropriate handling of CC'd emails."""
|
|
|
|
# first, check that test ticket exists
|
|
ticket1 = get_object_or_404(Ticket, pk=1)
|
|
self.assertEqual(ticket1.ticket_for_url, "CC-1")
|
|
self.assertEqual(ticket1.title, "Original Ticket")
|
|
# only the staff_user is CC'd for now
|
|
self.assertEqual(len(TicketCC.objects.filter(ticket=1)), 1)
|
|
ccstaff = get_object_or_404(TicketCC, pk=1)
|
|
self.assertEqual(ccstaff.user, User.objects.get(username='staff'))
|
|
self.assertEqual(ticket1.assigned_to,
|
|
User.objects.get(username='assigned'))
|
|
|
|
# example email text from Django docs:
|
|
# https://docs.djangoproject.com/en/1.10/ref/unicode/
|
|
test_email_from = "submitter@example.com"
|
|
# NOTE: CC emails are in alphabetical order and must be tested as such!
|
|
# implementation uses sets, so only way to ensure tickets created
|
|
# in right order is to change set to list and sort it
|
|
test_email_cc_one = "Alice Ráðormsdóttir <alice@example.com>"
|
|
test_email_cc_two = "nobody@example.com"
|
|
test_email_cc_three = "other@example.com"
|
|
test_email_cc_four = "someone@example.com"
|
|
ticket_user_emails = "assigned@example.com, staff@example.com, submitter@example.com, observer@example.com, queue@example.com"
|
|
test_email_subject = "[CC-1] My visit to Sør-Trøndelag"
|
|
test_email_body = "Unicode helpdesk comment with an s-hat (ŝ) via email."
|
|
test_email = "To: queue@example.com\nCc: " + test_email_cc_one + ", " + test_email_cc_one + ", " + test_email_cc_two + ", " + test_email_cc_three + "\nCC: " + test_email_cc_one + \
|
|
", " + test_email_cc_three + ", " + test_email_cc_four + ", " + ticket_user_emails + \
|
|
"\nFrom: " + test_email_from + "\nSubject: " + \
|
|
test_email_subject + "\n\n" + test_email_body
|
|
test_mail_len = len(test_email)
|
|
|
|
with mock.patch('os.listdir') as mocked_listdir, \
|
|
mock.patch('helpdesk.email.isfile') as mocked_isfile, \
|
|
mock.patch('builtins.open', mock.mock_open(read_data=test_email)), \
|
|
mock.patch('os.unlink'):
|
|
mocked_isfile.return_value = True
|
|
mocked_listdir.return_value = ['filename1']
|
|
|
|
call_command('get_email')
|
|
|
|
mocked_listdir.assert_called_with('/var/lib/mail/helpdesk/')
|
|
mocked_isfile.assert_any_call('/var/lib/mail/helpdesk/filename1')
|
|
|
|
# 9 unique email addresses are CC'd when all is done
|
|
self.assertEqual(len(TicketCC.objects.filter(ticket=1)), 9)
|
|
# next we make sure no duplicates were added, and the
|
|
# staff users nor submitter were not re-added as email TicketCCs
|
|
cc1 = get_object_or_404(TicketCC, pk=1)
|
|
self.assertEqual(cc1.user, User.objects.get(username='staff'))
|
|
cc2 = get_object_or_404(TicketCC, pk=2)
|
|
self.assertEqual(cc2.email, "alice@example.com")
|
|
cc3 = get_object_or_404(TicketCC, pk=3)
|
|
self.assertEqual(cc3.email, test_email_cc_two)
|
|
cc4 = get_object_or_404(TicketCC, pk=4)
|
|
self.assertEqual(cc4.email, test_email_cc_three)
|
|
cc5 = get_object_or_404(TicketCC, pk=5)
|
|
self.assertEqual(cc5.email, test_email_cc_four)
|
|
cc6 = get_object_or_404(TicketCC, pk=6)
|
|
self.assertEqual(cc6.email, "assigned@example.com")
|
|
cc7 = get_object_or_404(TicketCC, pk=7)
|
|
self.assertEqual(cc7.email, "staff@example.com")
|
|
cc8 = get_object_or_404(TicketCC, pk=8)
|
|
self.assertEqual(cc8.email, "submitter@example.com")
|
|
cc9 = get_object_or_404(TicketCC, pk=9)
|
|
self.assertEqual(cc9.user, User.objects.get(username='observer'))
|
|
self.assertEqual(cc9.email, "observer@example.com")
|
|
|
|
|
|
# build matrix of test cases
|
|
case_methods = [c[0] for c in Queue._meta.get_field('email_box_type').choices]
|
|
|
|
# uncomment if you want to run tests with socks - which is much slover
|
|
# case_socks = [False] + [c[0] for c in Queue._meta.get_field('socks_proxy_type').choices]
|
|
case_socks = [False]
|
|
case_matrix = list(itertools.product(case_methods, case_socks))
|
|
|
|
# Populate TestCases from the matrix of parameters
|
|
thismodule = sys.modules[__name__]
|
|
for method, socks in case_matrix:
|
|
|
|
if method == "local" and socks:
|
|
continue
|
|
|
|
socks_str = "Nosocks"
|
|
if socks:
|
|
socks_str = socks.capitalize()
|
|
test_name = str(
|
|
"TestGetEmail%s%s" % (method.capitalize(), socks_str))
|
|
|
|
cl = type(test_name, (GetEmailParametricTemplate, TestCase),
|
|
{"method": method, "socks": socks})
|
|
setattr(thismodule, test_name, cl)
|