mirror of
https://github.com/django-helpdesk/django-helpdesk.git
synced 2025-06-01 15:36:09 +02:00
1573 lines
67 KiB
Python
1573 lines
67 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from django.contrib.auth.hashers import make_password
|
|
from django.contrib.auth.models import User
|
|
from django.core import mail
|
|
from django.core.files.uploadedfile import SimpleUploadedFile
|
|
from django.core.management import call_command
|
|
from django.shortcuts import get_object_or_404
|
|
from django.test import override_settings, TestCase
|
|
from email.mime.message import MIMEMessage
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
import helpdesk.email
|
|
from helpdesk.email import extract_email_metadata, process_as_attachment
|
|
from helpdesk.exceptions import DeleteIgnoredTicketException, IgnoreTicketException
|
|
from helpdesk.management.commands.get_email import Command
|
|
from helpdesk.models import (
|
|
FollowUp,
|
|
FollowUpAttachment,
|
|
IgnoreEmail,
|
|
Queue,
|
|
Ticket,
|
|
TicketCC,
|
|
)
|
|
from helpdesk.tests import utils
|
|
import itertools
|
|
import logging
|
|
from mock.mock import patch
|
|
from oauthlib.oauth2 import BackendApplicationClient
|
|
import os
|
|
from shutil import rmtree
|
|
import sys
|
|
from tempfile import mkdtemp
|
|
import time
|
|
import typing
|
|
from unittest import mock
|
|
|
|
|
|
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
# class A addresses can't have first octet of 0
|
|
unrouted_socks_server = "0.0.0.1"
|
|
unrouted_email_server = "0.0.0.1"
|
|
# the last user port, reserved by IANA
|
|
unused_port = "49151"
|
|
|
|
fake_time = time.time()
|
|
|
|
|
|
class GetEmailCommonTests(TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super().setUpClass()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super().tearDownClass()
|
|
|
|
def setUp(self):
|
|
self.queue_public = Queue.objects.create(title="Test", slug="test")
|
|
self.logger = logging.getLogger("helpdesk")
|
|
|
|
# tests correct syntax for command line option
|
|
def test_get_email_quiet_option(self):
|
|
"""Test quiet option is properly propagated"""
|
|
# Test get_email with quiet set to True and also False, and verify
|
|
# handle receives quiet option set properly
|
|
for quiet_test_value in [True, False]:
|
|
with mock.patch.object(
|
|
Command, "handle", return_value=None
|
|
) as mocked_handle:
|
|
call_command("get_email", quiet=quiet_test_value)
|
|
mocked_handle.assert_called_once()
|
|
for _, kwargs in mocked_handle.call_args_list:
|
|
self.assertEqual(quiet_test_value, (kwargs["quiet"]))
|
|
|
|
def test_email_with_blank_body_and_attachment(self):
|
|
"""
|
|
Tests that emails with blank bodies and attachments work.
|
|
https://github.com/django-helpdesk/django-helpdesk/issues/700
|
|
"""
|
|
with open(
|
|
os.path.join(THIS_DIR, "test_files/blank-body-with-attachment.eml"),
|
|
encoding="utf-8",
|
|
) as fd:
|
|
test_email = fd.read()
|
|
ticket = helpdesk.email.extract_email_metadata(
|
|
test_email, self.queue_public, self.logger
|
|
)
|
|
# title got truncated because of max_lengh of the model.title field
|
|
assert ticket.title == (
|
|
"Attachment without body - and a loooooooooooooooooooooooooooooooooo"
|
|
"ooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
|
|
"ooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo..."
|
|
)
|
|
self.assertEqual(ticket.description.strip(), "", msg=ticket.description)
|
|
|
|
def test_email_with_quoted_printable_body(self):
|
|
"""
|
|
Tests that emails with quoted-printable bodies work.
|
|
"""
|
|
with open(
|
|
os.path.join(THIS_DIR, "test_files/quoted_printable.eml"), encoding="utf-8"
|
|
) as fd:
|
|
test_email = fd.read()
|
|
ticket = helpdesk.email.extract_email_metadata(
|
|
test_email, self.queue_public, self.logger
|
|
)
|
|
self.assertEqual(ticket.title, "Český test")
|
|
self.assertEqual(
|
|
ticket.description, "Tohle je test českých písmen odeslaných z gmailu."
|
|
)
|
|
followups = FollowUp.objects.filter(ticket=ticket)
|
|
self.assertEqual(len(followups), 1)
|
|
followup = followups[0]
|
|
attachments = FollowUpAttachment.objects.filter(followup=followup)
|
|
self.assertEqual(len(attachments), 1)
|
|
attachment = attachments[0]
|
|
self.assertIn(
|
|
'<div dir="ltr">Tohle je test českých písmen odeslaných z gmailu.</div>\n',
|
|
attachment.file.read().decode("utf-8"),
|
|
)
|
|
|
|
def test_email_with_8bit_encoding_and_utf_8(self):
|
|
"""
|
|
Tests that emails with 8bit transfer encoding and utf-8 charset
|
|
https://github.com/django-helpdesk/django-helpdesk/issues/732
|
|
"""
|
|
with open(
|
|
os.path.join(THIS_DIR, "test_files/all-special-chars.eml"), encoding="utf-8"
|
|
) as fd:
|
|
test_email = fd.read()
|
|
ticket = helpdesk.email.extract_email_metadata(
|
|
test_email, self.queue_public, self.logger
|
|
)
|
|
self.assertEqual(ticket.title, "Testovácí email")
|
|
self.assertEqual(ticket.description, "íářčšáíéřášč")
|
|
|
|
@override_settings(HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL=False)
|
|
def test_email_with_utf_8_non_decodable_sequences(self):
|
|
"""
|
|
Tests that emails with utf-8 non-decodable sequences are parsed correctly
|
|
The forwarded part of the message must still be in the ticket description if
|
|
HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL is set to True.
|
|
Otherwise it should only be in the Followup records Comment field
|
|
NOTE: This test weirdly tests having completely different content in the HTML part
|
|
than the PLAIN part so not sure about the validity of this as a test but
|
|
the code does support this oddity anyway for backwards compatibility
|
|
"""
|
|
with open(
|
|
os.path.join(THIS_DIR, "test_files/utf-nondecodable.eml"), encoding="utf-8"
|
|
) as fd:
|
|
test_email = fd.read()
|
|
ticket = helpdesk.email.extract_email_metadata(
|
|
test_email, self.queue_public, self.logger
|
|
)
|
|
self.assertEqual(ticket.title, "Fwd: Cyklozaměstnavatel - změna vyhodnocení")
|
|
self.assertIn(
|
|
"prosazuje lepší",
|
|
ticket.description,
|
|
'Missing text "prosazuje lepší" in description: %s' % ticket.description,
|
|
)
|
|
followups = FollowUp.objects.filter(ticket=ticket)
|
|
followup = followups[0]
|
|
attachments = FollowUpAttachment.objects.filter(followup=followup)
|
|
attachment = attachments[0]
|
|
self.assertIn("prosazuje lepší", attachment.file.read().decode("utf-8"))
|
|
|
|
@override_settings(HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL=True)
|
|
def test_email_with_forwarded_message_just_message_stored(self):
|
|
"""
|
|
The forwarded part of the message must still be in the ticket description if
|
|
HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL is set to True.
|
|
Otherwise it should only be in the Followup records Comment field
|
|
"""
|
|
with open(
|
|
os.path.join(THIS_DIR, "test_files/forwarded-message.eml"), encoding="utf-8"
|
|
) as fd:
|
|
test_email = fd.read()
|
|
ticket = helpdesk.email.extract_email_metadata(
|
|
test_email, self.queue_public, self.logger
|
|
)
|
|
self.assertEqual(ticket.title, "Test with original message from GitHub")
|
|
self.assertIn("This is email body", ticket.description)
|
|
self.assertTrue("Hello there!" in ticket.description, ticket.description)
|
|
self.assertTrue(FollowUp.objects.filter(ticket=ticket).count() == 1)
|
|
|
|
@override_settings(HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL=False)
|
|
def test_email_with_forwarded_message_chained_messages_stored(self):
|
|
"""
|
|
The forwarded part of the message must still be in the ticket description if
|
|
HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL is set to True.
|
|
Otherwise it should only be in the Followup records Comment field
|
|
"""
|
|
with open(
|
|
os.path.join(THIS_DIR, "test_files/forwarded-message.eml"), encoding="utf-8"
|
|
) as fd:
|
|
test_email = fd.read()
|
|
ticket = helpdesk.email.extract_email_metadata(
|
|
test_email, self.queue_public, self.logger
|
|
)
|
|
self.assertEqual(ticket.title, "Test with original message from GitHub")
|
|
self.assertIn("This is email body", ticket.description)
|
|
self.assertTrue("Hello there!" not in ticket.description, ticket.description)
|
|
followups = FollowUp.objects.filter(ticket=ticket).values("comment")
|
|
self.assertTrue(followups.count() == 1)
|
|
self.assertTrue(
|
|
"Hello there!" in followups[0]["comment"], followups[0]["comment"]
|
|
)
|
|
|
|
def test_will_delete_ignored_email(self):
|
|
"""
|
|
Tests if an email will be ignored if configured to do so and throws the correct exception
|
|
to ensure the email is deleted
|
|
"""
|
|
message, from_meta, _ = utils.generate_text_email(locale="es_ES")
|
|
ignore = IgnoreEmail(
|
|
name="Test Ignore", email_address=from_meta[1], keep_in_mailbox=False
|
|
)
|
|
ignore.save()
|
|
with self.assertRaises(DeleteIgnoredTicketException):
|
|
extract_email_metadata(message.as_string(), self.queue_public, self.logger)
|
|
|
|
def test_will_not_delete_ignored_email(self):
|
|
"""
|
|
Tests if an email will be ignored if configured to do so and throws the correct exception
|
|
to ensure the email is NOT deleted
|
|
"""
|
|
message, from_meta, _ = utils.generate_text_email(locale="es_ES")
|
|
ignore = IgnoreEmail(
|
|
name="Test Ignore", email_address=from_meta[1], keep_in_mailbox=True
|
|
)
|
|
ignore.save()
|
|
with self.assertRaises(IgnoreTicketException):
|
|
extract_email_metadata(message.as_string(), self.queue_public, self.logger)
|
|
|
|
def test_utf8_filename_attachment(self):
|
|
"""
|
|
Tests if an attachment correctly sent with a UTF8 filename in disposition is extracted correctly
|
|
"""
|
|
filename = "TeléfonoMañana.txt"
|
|
part = utils.generate_file_mime_part(locale="es_ES", filename=filename)
|
|
files: typing.List[SimpleUploadedFile] = []
|
|
process_as_attachment(part, counter=1, files=files, logger=self.logger)
|
|
sent_file: SimpleUploadedFile = files[0]
|
|
# The extractor prepends a part identifier so compare the ending
|
|
self.assertTrue(
|
|
sent_file.name.endswith(filename),
|
|
f"Filename extracted does not match: {sent_file.name}",
|
|
)
|
|
|
|
def test_wrong_extension_attachment(self):
|
|
"""
|
|
Tests if an attachment with a wrong extension doesn't stop the email process
|
|
"""
|
|
message, _, _ = utils.generate_multipart_email(
|
|
type_list=["plain", "executable"]
|
|
)
|
|
|
|
self.assertEqual(len(mail.outbox), 0) # @UndefinedVariable
|
|
|
|
with self.assertLogs(logger="helpdesk", level="ERROR") as cm:
|
|
extract_email_metadata(message.as_string(), self.queue_public, self.logger)
|
|
|
|
self.assertIn(
|
|
"ERROR:helpdesk:['Unsupported file extension: .exe']", cm.output
|
|
)
|
|
|
|
self.assertEqual(len(mail.outbox), 1) # @UndefinedVariable
|
|
self.assertEqual(
|
|
f"[test-1] {message.get('subject')} (Opened)", mail.outbox[0].subject
|
|
)
|
|
|
|
def test_multiple_attachments(self):
|
|
"""
|
|
Tests the saving of multiple attachments
|
|
"""
|
|
message, _, _ = utils.generate_multipart_email(
|
|
type_list=["plain", "file", "image"]
|
|
)
|
|
|
|
self.assertEqual(len(mail.outbox), 0) # @UndefinedVariable
|
|
|
|
extract_email_metadata(message.as_string(), self.queue_public, self.logger)
|
|
|
|
self.assertEqual(len(mail.outbox), 1) # @UndefinedVariable
|
|
self.assertEqual(
|
|
f"[test-1] {message.get('subject')} (Opened)", mail.outbox[0].subject
|
|
)
|
|
|
|
ticket = Ticket.objects.get()
|
|
followup = ticket.followup_set.get()
|
|
self.assertEqual(2, followup.followupattachment_set.count())
|
|
|
|
def test_multiple_attachments_with_wrong_extension(self):
|
|
"""
|
|
Tests that a wrong extension won't stop from saving other valid attachment
|
|
"""
|
|
message, _, _ = utils.generate_multipart_email(
|
|
type_list=["plain", "executable", "file", "executable"]
|
|
)
|
|
|
|
self.assertEqual(len(mail.outbox), 0) # @UndefinedVariable
|
|
|
|
with self.assertLogs(logger="helpdesk", level="ERROR") as cm:
|
|
extract_email_metadata(message.as_string(), self.queue_public, self.logger)
|
|
|
|
self.assertIn(
|
|
"ERROR:helpdesk:['Unsupported file extension: .exe']", cm.output
|
|
)
|
|
|
|
ticket = Ticket.objects.get()
|
|
followup = ticket.followup_set.get()
|
|
self.assertEqual(1, followup.followupattachment_set.count())
|
|
|
|
def test_email_with_multipart_as_attachment(self):
|
|
"""
|
|
Is a multipart attachment to an email correctly saved as an attachment
|
|
"""
|
|
att_filename = "email_attachment.eml"
|
|
message, _, _ = utils.generate_multipart_email(type_list=["plain", "html"])
|
|
email_attachment, _, _ = utils.generate_multipart_email(
|
|
type_list=["plain", "html"]
|
|
)
|
|
att_content = email_attachment.as_string()
|
|
message.attach(
|
|
utils.generate_file_mime_part(filename=att_filename, content=att_content)
|
|
)
|
|
|
|
extract_email_metadata(message.as_string(), self.queue_public, self.logger)
|
|
self.assertEqual(len(mail.outbox), 1) # @UndefinedVariable
|
|
self.assertEqual(
|
|
f"[test-1] {message.get('subject')} (Opened)", mail.outbox[0].subject
|
|
)
|
|
|
|
ticket = Ticket.objects.get()
|
|
followup = ticket.followup_set.get()
|
|
|
|
for att_retrieved in followup.followupattachment_set.all():
|
|
if helpdesk.email.HTML_EMAIL_ATTACHMENT_FILENAME == att_retrieved.filename:
|
|
# Ignore the HTML formatted conntent of the email that is attached
|
|
continue
|
|
self.assertTrue(
|
|
att_retrieved.filename.endswith(att_filename),
|
|
"Filename of attached multipart not detected: %s"
|
|
% (att_retrieved.filename),
|
|
)
|
|
with att_retrieved.file.open("r") as f:
|
|
retrieved_content = f.read()
|
|
self.assertEqual(
|
|
att_content,
|
|
retrieved_content,
|
|
"Retrieved attachment content different to original :\n\n%s\n\n%s"
|
|
% (att_content, retrieved_content),
|
|
)
|
|
|
|
def test_email_with_inline_and_multipart_as_attachments(self):
|
|
"""
|
|
Test a multipart email with an inline attachment and a multipart email attachment to the email
|
|
"""
|
|
inline_att_filename = "inline_attachment.jpg"
|
|
email_att_filename = "email_attachment2.eml"
|
|
# Create the inline attachment for the email using an ID so we can reference it in the email body
|
|
inline_image_id = "liTE5er6Dbnd"
|
|
inline_attachment = utils.generate_image_mime_part(
|
|
locale="en_US",
|
|
imagename=inline_att_filename,
|
|
disposition_primary_type="inline",
|
|
)
|
|
inline_attachment.add_header("X-Attachment-Id", inline_image_id)
|
|
inline_attachment.add_header("Content-ID", "<" + inline_image_id + ">")
|
|
# Create the actual email with its plain and HTML parts
|
|
alt_email_message = MIMEMultipart("alternative")
|
|
# Create the plain and HTML that will reference the inline attachment
|
|
plain_body = (
|
|
"Test with inline image: \n[image: " + inline_att_filename + "]\n\n"
|
|
)
|
|
plain_msg = MIMEText(plain_body)
|
|
alt_email_message.attach(plain_msg)
|
|
html_body = (
|
|
'<div><div>Test with inline image: <img src=3D"cid:'
|
|
+ inline_image_id
|
|
+ '" alt=3D="'
|
|
+ inline_att_filename
|
|
+ '"><br></div>'
|
|
)
|
|
html_msg = MIMEText(html_body, "html")
|
|
alt_email_message.attach(html_msg)
|
|
# Create the email to be attached and attach that as well
|
|
email_to_be_attached, _, _ = utils.generate_multipart_email(
|
|
type_list=["plain", "html"]
|
|
)
|
|
email_as_attachment = MIMEMessage(email_to_be_attached)
|
|
email_as_attachment.add_header(
|
|
"Content-Disposition", "attachment", filename=email_att_filename
|
|
)
|
|
# Now create the base multipart and attach all the other parts to it
|
|
related_message = MIMEMultipart("related")
|
|
related_message.attach(alt_email_message)
|
|
related_message.attach(inline_attachment)
|
|
base_message = MIMEMultipart("mixed")
|
|
base_message.attach(related_message)
|
|
base_message.attach(email_as_attachment)
|
|
utils.add_simple_email_headers(
|
|
base_message, locale="en_US", use_short_email=True
|
|
)
|
|
# Now send the part to the email workflow
|
|
extract_email_metadata(base_message.as_string(), self.queue_public, self.logger)
|
|
|
|
self.assertEqual(len(mail.outbox), 1) # @UndefinedVariable
|
|
self.assertEqual(
|
|
f"[test-1] {base_message.get('subject')} (Opened)", mail.outbox[0].subject
|
|
)
|
|
|
|
ticket = Ticket.objects.get()
|
|
followup = ticket.followup_set.get()
|
|
# Check both the inline and attachment are stored as attached files
|
|
inline_found = False
|
|
email_attachment_found = False
|
|
for att_retrieved in followup.followupattachment_set.all():
|
|
if helpdesk.email.HTML_EMAIL_ATTACHMENT_FILENAME == att_retrieved.filename:
|
|
# Ignore the HTML formatted content of the email that is attached
|
|
continue
|
|
if att_retrieved.filename.endswith(inline_att_filename):
|
|
inline_found = True
|
|
elif att_retrieved.filename.endswith(email_att_filename):
|
|
email_attachment_found = True
|
|
else:
|
|
self.assertTrue(
|
|
False,
|
|
"Unexpected file in ticket attachments: %s"
|
|
% att_retrieved.filename,
|
|
)
|
|
self.assertTrue(
|
|
email_attachment_found,
|
|
"Email attachment file not found ticket attachments: %s"
|
|
% (email_att_filename),
|
|
)
|
|
self.assertTrue(
|
|
inline_found, "Inline file not found in email: %s" % (inline_att_filename)
|
|
)
|
|
|
|
def test_email_with_txt_as_attachment_with_simple_alternative_message(self):
|
|
"""
|
|
Test an email with a txt extension email attachment to the email where the message part of the
|
|
email is in a multipart/alternative directly off the main multipart/mixed (no multipart/related)
|
|
"""
|
|
email_att_filename = "the_quick_brown_fox.txt"
|
|
# Create the actual email with its plain and HTML parts
|
|
alt_email_message = MIMEMultipart("alternative")
|
|
# Create the plain and HTML that will reference the inline attachment
|
|
plain_body = "Is wet birds attached?\n\n"
|
|
plain_msg = MIMEText(plain_body)
|
|
alt_email_message.attach(plain_msg)
|
|
html_body = "<div><div>Is wet birds attached?</div><br></div>"
|
|
html_msg = MIMEText(html_body, "html")
|
|
alt_email_message.attach(html_msg)
|
|
# Now create the base multipart and attach all the other parts to it
|
|
base_message = MIMEMultipart("mixed")
|
|
base_message.attach(alt_email_message)
|
|
email_attachment = MIMEText("Wet birds don't fly at night.")
|
|
email_attachment.add_header(
|
|
"Content-Disposition", "attachment", filename=email_att_filename
|
|
)
|
|
base_message.attach(email_attachment)
|
|
utils.add_simple_email_headers(
|
|
base_message, locale="en_US", use_short_email=True
|
|
)
|
|
# Now send the part to the email workflow
|
|
extract_email_metadata(base_message.as_string(), self.queue_public, self.logger)
|
|
self.assertEqual(len(mail.outbox), 1) # @UndefinedVariable
|
|
# self.assertEqual(f'[test-1] {base_message.get("subject")} (Opened)', mail.outbox[0].subject)
|
|
|
|
ticket = Ticket.objects.get()
|
|
followup = ticket.followup_set.get()
|
|
# Check attachment is stored as attached file
|
|
email_attachment_found = False
|
|
for att_retrieved in followup.followupattachment_set.all():
|
|
if helpdesk.email.HTML_EMAIL_ATTACHMENT_FILENAME == att_retrieved.filename:
|
|
# Ignore the HTML formatted content of the email that is attached
|
|
continue
|
|
if att_retrieved.filename.endswith(email_att_filename):
|
|
email_attachment_found = True
|
|
else:
|
|
self.assertTrue(
|
|
False,
|
|
"Unexpected file in ticket attachments: %s"
|
|
% att_retrieved.filename,
|
|
)
|
|
self.assertTrue(
|
|
email_attachment_found,
|
|
"Email attachment file not found ticket attachments: %s"
|
|
% (email_att_filename),
|
|
)
|
|
|
|
@override_settings(QUEUE_EMAIL_BOX_UPDATE_ONLY=False)
|
|
def test_email_for_new_ticket_when_setting_allows_create(self):
|
|
"""
|
|
A new ticket is created by email if the QUEUE_EMAIL_BOX_UPDATE_ONLY setting is False
|
|
"""
|
|
message, _, _ = utils.generate_text_email(locale="en_GB")
|
|
ticket = helpdesk.email.extract_email_metadata(
|
|
message.as_string(), self.queue_public, self.logger
|
|
)
|
|
self.assertIsNotNone(ticket, "Ticket not created when it should be.")
|
|
|
|
@override_settings(QUEUE_EMAIL_BOX_UPDATE_ONLY=True)
|
|
def test_email_for_no_new_ticket_when_setting_only_allows_update(self):
|
|
"""
|
|
A new ticket cannot be created by email if the QUEUE_EMAIL_BOX_UPDATE_ONLY setting is True
|
|
"""
|
|
message, _, _ = utils.generate_text_email(locale="es_ES")
|
|
ticket = helpdesk.email.extract_email_metadata(
|
|
message.as_string(), self.queue_public, self.logger
|
|
)
|
|
self.assertIsNone(ticket, f"Ticket was created when it should not be: {ticket}"
|
|
)
|
|
|
|
|
|
class EmailTaskTests(TestCase):
|
|
def setUp(self):
|
|
self.num_queues = 5
|
|
self.exception_queues = [2, 4]
|
|
self.q_ids = []
|
|
for i in range(self.num_queues):
|
|
q = Queue.objects.create(
|
|
title=f"Test{i + 1}",
|
|
slug=f"test{i + 1}",
|
|
email_box_type="local",
|
|
allow_email_submission=True,
|
|
)
|
|
self.q_ids.append(q.id)
|
|
self.logger = logging.getLogger("helpdesk")
|
|
|
|
def test_get_email_with_debug_to_stdout_option(self):
|
|
"""Test debug_to_stdout option"""
|
|
# Test get_email with debug_to_stdout set to True and also False, and verify
|
|
# handle receives debug_to_stdout option set properly
|
|
for debug_to_stdout in [True, False]:
|
|
with mock.patch.object(
|
|
Command, "handle", return_value=None
|
|
) as mocked_handle:
|
|
call_command(
|
|
"get_email", "--debug_to_stdout"
|
|
) if debug_to_stdout else call_command("get_email")
|
|
mocked_handle.assert_called_once()
|
|
for _, kwargs in mocked_handle.call_args_list:
|
|
self.assertEqual(debug_to_stdout, (kwargs["debug_to_stdout"]))
|
|
|
|
@patch("helpdesk.email.process_queue")
|
|
def test_get_email_with_queue_failure(self, mocked_process_queue):
|
|
"""Test all queues are processed if specified queues have exceptions"""
|
|
ret_values = [
|
|
Exception(f"Error Q{i};") if (i in self.exception_queues) else None
|
|
for i in range(1, self.num_queues + 1)
|
|
]
|
|
mocked_process_queue.side_effect = ret_values
|
|
call_command(
|
|
"get_email",
|
|
"--debug_to_stdout",
|
|
)
|
|
self.assertEqual(mocked_process_queue.call_count, self.num_queues)
|
|
not_processed_count = Queue.objects.filter(
|
|
email_box_type__isnull=False,
|
|
allow_email_submission=True,
|
|
email_box_last_check__isnull=True,
|
|
).count()
|
|
self.assertEqual(
|
|
not_processed_count,
|
|
len(self.exception_queues),
|
|
"Incorrect number of queues that did not get processed due to a forced exception.",
|
|
)
|
|
|
|
|
|
class GetEmailParametricTemplate(object):
|
|
"""TestCase that checks basic email functionality across methods and socks configs."""
|
|
|
|
def setUp(self):
|
|
self.temp_logdir = mkdtemp()
|
|
kwargs = {
|
|
"title": "Basic Queue",
|
|
"slug": "QQ",
|
|
"allow_public_submission": True,
|
|
"allow_email_submission": True,
|
|
"email_box_type": self.method,
|
|
"logging_dir": self.temp_logdir,
|
|
"logging_type": "none",
|
|
}
|
|
|
|
if self.method == "local":
|
|
kwargs["email_box_local_dir"] = "/var/lib/mail/helpdesk/"
|
|
else:
|
|
kwargs["email_box_host"] = unrouted_email_server
|
|
kwargs["email_box_port"] = unused_port
|
|
|
|
if self.socks:
|
|
kwargs["socks_proxy_type"] = self.socks
|
|
kwargs["socks_proxy_host"] = unrouted_socks_server
|
|
kwargs["socks_proxy_port"] = unused_port
|
|
|
|
self.queue_public = Queue.objects.create(**kwargs)
|
|
|
|
self.token = {
|
|
"token_type": "Bearer",
|
|
"access_token": "asdfoiw37850234lkjsdfsdf",
|
|
"refresh_token": "sldvafkjw34509s8dfsdf",
|
|
"expires_in": "3600",
|
|
"expires_at": fake_time + 3600,
|
|
}
|
|
self.client_id = "foo"
|
|
self.client = BackendApplicationClient(self.client_id)
|
|
|
|
def tearDown(self):
|
|
rmtree(self.temp_logdir)
|
|
|
|
def test_read_plain_email(self):
|
|
"""Tests reading plain text emails from a queue and creating tickets.
|
|
For each email source supported, we mock the backend to provide
|
|
authentically formatted responses containing our test data."""
|
|
# example email text from Django docs:
|
|
# https://docs.djangoproject.com/en/1.10/ref/unicode/
|
|
test_email_from = "Arnbjörg Ráðormsdóttir <arnbjorg@example.com>"
|
|
test_email_subject = "My visit to Sør-Trøndelag"
|
|
test_email_body = "Unicode helpdesk comment with an s-hat (ŝ) via email."
|
|
test_email = (
|
|
"To: helpdesk@example.com\nFrom: "
|
|
+ test_email_from
|
|
+ "\nSubject: "
|
|
+ test_email_subject
|
|
+ "\n\n"
|
|
+ test_email_body
|
|
)
|
|
test_mail_len = len(test_email)
|
|
|
|
if self.socks:
|
|
from socks import ProxyConnectionError
|
|
|
|
with self.assertRaisesRegex(
|
|
ProxyConnectionError, "%s:%s" % (unrouted_socks_server, unused_port)
|
|
):
|
|
call_command("get_email")
|
|
|
|
else:
|
|
# Test local email reading
|
|
if self.method == "local":
|
|
with (
|
|
mock.patch("os.listdir") as mocked_listdir,
|
|
mock.patch("helpdesk.email.isfile") as mocked_isfile,
|
|
mock.patch("builtins.open", mock.mock_open(read_data=test_email)),
|
|
mock.patch("os.unlink"),
|
|
):
|
|
mocked_isfile.return_value = True
|
|
mocked_listdir.return_value = ["filename1", "filename2"]
|
|
|
|
call_command("get_email")
|
|
|
|
mocked_listdir.assert_called_with("/var/lib/mail/helpdesk/")
|
|
mocked_isfile.assert_any_call("/var/lib/mail/helpdesk/filename1")
|
|
mocked_isfile.assert_any_call("/var/lib/mail/helpdesk/filename2")
|
|
|
|
elif self.method == "pop3":
|
|
# mock poplib.POP3's list and retr methods to provide responses
|
|
# as per RFC 1939
|
|
pop3_emails = {
|
|
"1": ("+OK", test_email.split("\n")),
|
|
"2": ("+OK", test_email.split("\n")),
|
|
}
|
|
pop3_mail_list = (
|
|
"+OK 2 messages",
|
|
("1 %d" % test_mail_len, "2 %d" % test_mail_len),
|
|
)
|
|
mocked_poplib_server = mock.Mock()
|
|
mocked_poplib_server.list = mock.Mock(return_value=pop3_mail_list)
|
|
mocked_poplib_server.retr = mock.Mock(
|
|
side_effect=lambda x: pop3_emails[x]
|
|
)
|
|
with mock.patch(
|
|
"helpdesk.email.poplib", autospec=True
|
|
) as mocked_poplib:
|
|
mocked_poplib.POP3 = mock.Mock(return_value=mocked_poplib_server)
|
|
call_command("get_email")
|
|
|
|
elif self.method == "imap":
|
|
# mock imaplib.IMAP4's search and fetch methods with responses
|
|
# from RFC 3501
|
|
imap_emails = {
|
|
"1": ("OK", (("1", test_email),)),
|
|
"2": ("OK", (("2", test_email),)),
|
|
}
|
|
imap_mail_list = ("OK", ("1 2",))
|
|
mocked_imaplib_server = mock.Mock()
|
|
mocked_imaplib_server.search = mock.Mock(return_value=imap_mail_list)
|
|
# we ignore the second arg as the data item/mime-part is
|
|
# constant (RFC822)
|
|
mocked_imaplib_server.fetch = mock.Mock(
|
|
side_effect=lambda x, _: imap_emails[x]
|
|
)
|
|
with mock.patch(
|
|
"helpdesk.email.imaplib", autospec=True
|
|
) as mocked_imaplib:
|
|
mocked_imaplib.IMAP4 = mock.Mock(return_value=mocked_imaplib_server)
|
|
call_command("get_email")
|
|
|
|
elif self.method == "oauth":
|
|
# mock the oauthlib session and requests oauth backendclient
|
|
# then mock imaplib.IMAP4's search and fetch methods with responses
|
|
# from RFC 3501
|
|
imap_emails = {
|
|
"1": ("OK", (("1", test_email),)),
|
|
"2": ("OK", (("2", test_email),)),
|
|
}
|
|
imap_mail_list = ("OK", ("1 2",))
|
|
mocked_imaplib_server = mock.Mock()
|
|
mocked_imaplib_server.search = mock.Mock(return_value=imap_mail_list)
|
|
# we ignore the second arg as the data item/mime-part is
|
|
# constant (RFC822)
|
|
mocked_imaplib_server.fetch = mock.Mock(
|
|
side_effect=lambda x, _: imap_emails[x]
|
|
)
|
|
|
|
mocked_oauth_backend_client = mock.Mock()
|
|
with mock.patch(
|
|
"helpdesk.email.oauth2lib", autospec=True
|
|
) as mocked_oauth2lib:
|
|
mocked_oauth2lib.BackendApplicationClient = mock.Mock(
|
|
return_value=mocked_oauth_backend_client
|
|
)
|
|
|
|
mocked_oauth_session = mock.Mock()
|
|
mocked_oauth_session.fetch_token = mock.Mock(return_value={})
|
|
|
|
with mock.patch(
|
|
"helpdesk.email.requests_oauthlib", autospec=True
|
|
) as mocked_requests_oauthlib:
|
|
mocked_requests_oauthlib.OAuth2Session = mock.Mock(
|
|
return_value=mocked_oauth_session
|
|
)
|
|
|
|
with mock.patch(
|
|
"helpdesk.email.imaplib", autospec=True
|
|
) as mocked_imaplib:
|
|
mocked_imaplib.IMAP4 = mock.Mock(
|
|
return_value=mocked_imaplib_server
|
|
)
|
|
|
|
call_command("get_email")
|
|
|
|
ticket1 = get_object_or_404(Ticket, pk=1)
|
|
self.assertEqual(ticket1.ticket_for_url, "QQ-%s" % ticket1.id)
|
|
self.assertEqual(ticket1.title, test_email_subject)
|
|
self.assertEqual(ticket1.description, test_email_body)
|
|
|
|
ticket2 = get_object_or_404(Ticket, pk=2)
|
|
self.assertEqual(ticket2.ticket_for_url, "QQ-%s" % ticket2.id)
|
|
self.assertEqual(ticket2.title, test_email_subject)
|
|
self.assertEqual(ticket2.description, test_email_body)
|
|
|
|
def test_commas_in_mail_headers(self):
|
|
"""Tests correctly decoding mail headers when a comma is encoded into
|
|
UTF-8. See bug report #832."""
|
|
# Create the from using standard RFC required formats
|
|
# Override the last_name to ensure we get a non-ascii character in it
|
|
test_email_from_meta = utils.generate_email_address(
|
|
"fr_FR", last_name_override="Bouissières"
|
|
)
|
|
test_email_subject = "Commas in From lines"
|
|
test_email_body = "Testing commas in from email UTF-8."
|
|
test_email = (
|
|
"To: helpdesk@example.com\nFrom: "
|
|
+ test_email_from_meta[0]
|
|
+ "\nSubject: "
|
|
+ test_email_subject
|
|
+ "\n\n"
|
|
+ test_email_body
|
|
)
|
|
test_mail_len = len(test_email)
|
|
|
|
if self.socks:
|
|
from socks import ProxyConnectionError
|
|
|
|
with self.assertRaisesRegex(
|
|
ProxyConnectionError, "%s:%s" % (unrouted_socks_server, unused_port)
|
|
):
|
|
call_command("get_email")
|
|
|
|
else:
|
|
# Test local email reading
|
|
if self.method == "local":
|
|
with (
|
|
mock.patch("os.listdir") as mocked_listdir,
|
|
mock.patch("helpdesk.email.isfile") as mocked_isfile,
|
|
mock.patch("builtins.open", mock.mock_open(read_data=test_email)),
|
|
mock.patch("os.unlink"),
|
|
):
|
|
mocked_isfile.return_value = True
|
|
mocked_listdir.return_value = ["filename1", "filename2"]
|
|
|
|
call_command("get_email")
|
|
|
|
mocked_listdir.assert_called_with("/var/lib/mail/helpdesk/")
|
|
mocked_isfile.assert_any_call("/var/lib/mail/helpdesk/filename1")
|
|
mocked_isfile.assert_any_call("/var/lib/mail/helpdesk/filename2")
|
|
|
|
elif self.method == "pop3":
|
|
# mock poplib.POP3's list and retr methods to provide responses
|
|
# as per RFC 1939
|
|
pop3_emails = {
|
|
"1": ("+OK", test_email.split("\n")),
|
|
"2": ("+OK", test_email.split("\n")),
|
|
}
|
|
pop3_mail_list = (
|
|
"+OK 2 messages",
|
|
("1 %d" % test_mail_len, "2 %d" % test_mail_len),
|
|
)
|
|
mocked_poplib_server = mock.Mock()
|
|
mocked_poplib_server.list = mock.Mock(return_value=pop3_mail_list)
|
|
mocked_poplib_server.retr = mock.Mock(
|
|
side_effect=lambda x: pop3_emails[x]
|
|
)
|
|
with mock.patch(
|
|
"helpdesk.email.poplib", autospec=True
|
|
) as mocked_poplib:
|
|
mocked_poplib.POP3 = mock.Mock(return_value=mocked_poplib_server)
|
|
call_command("get_email")
|
|
|
|
elif self.method == "imap":
|
|
# mock imaplib.IMAP4's search and fetch methods with responses
|
|
# from RFC 3501
|
|
imap_emails = {
|
|
"1": ("OK", (("1", test_email),)),
|
|
"2": ("OK", (("2", test_email),)),
|
|
}
|
|
imap_mail_list = ("OK", ("1 2",))
|
|
mocked_imaplib_server = mock.Mock()
|
|
mocked_imaplib_server.search = mock.Mock(return_value=imap_mail_list)
|
|
# we ignore the second arg as the data item/mime-part is
|
|
# constant (RFC822)
|
|
mocked_imaplib_server.fetch = mock.Mock(
|
|
side_effect=lambda x, _: imap_emails[x]
|
|
)
|
|
with mock.patch(
|
|
"helpdesk.email.imaplib", autospec=True
|
|
) as mocked_imaplib:
|
|
mocked_imaplib.IMAP4 = mock.Mock(return_value=mocked_imaplib_server)
|
|
call_command("get_email")
|
|
|
|
elif self.method == "oauth":
|
|
# mock the oauthlib session and requests oauth backendclient
|
|
# then mock imaplib.IMAP4's search and fetch methods with responses
|
|
# from RFC 3501
|
|
imap_emails = {
|
|
"1": ("OK", (("1", test_email),)),
|
|
"2": ("OK", (("2", test_email),)),
|
|
}
|
|
imap_mail_list = ("OK", ("1 2",))
|
|
mocked_imaplib_server = mock.Mock()
|
|
mocked_imaplib_server.search = mock.Mock(return_value=imap_mail_list)
|
|
# we ignore the second arg as the data item/mime-part is
|
|
# constant (RFC822)
|
|
mocked_imaplib_server.fetch = mock.Mock(
|
|
side_effect=lambda x, _: imap_emails[x]
|
|
)
|
|
|
|
mocked_oauth_backend_client = mock.Mock()
|
|
with mock.patch(
|
|
"helpdesk.email.oauth2lib", autospec=True
|
|
) as mocked_oauth2lib:
|
|
mocked_oauth2lib.BackendApplicationClient = mock.Mock(
|
|
return_value=mocked_oauth_backend_client
|
|
)
|
|
|
|
mocked_oauth_session = mock.Mock()
|
|
mocked_oauth_session.fetch_token = mock.Mock(return_value={})
|
|
|
|
with mock.patch(
|
|
"helpdesk.email.requests_oauthlib", autospec=True
|
|
) as mocked_requests_oauthlib:
|
|
mocked_requests_oauthlib.OAuth2Session = mock.Mock(
|
|
return_value=mocked_oauth_session
|
|
)
|
|
|
|
with mock.patch(
|
|
"helpdesk.email.imaplib", autospec=True
|
|
) as mocked_imaplib:
|
|
mocked_imaplib.IMAP4 = mock.Mock(
|
|
return_value=mocked_imaplib_server
|
|
)
|
|
|
|
call_command("get_email")
|
|
|
|
ticket1 = get_object_or_404(Ticket, pk=1)
|
|
self.assertEqual(ticket1.ticket_for_url, "QQ-%s" % ticket1.id)
|
|
self.assertEqual(ticket1.submitter_email, test_email_from_meta[1])
|
|
self.assertEqual(ticket1.title, test_email_subject)
|
|
self.assertEqual(ticket1.description, test_email_body)
|
|
|
|
ticket2 = get_object_or_404(Ticket, pk=2)
|
|
self.assertEqual(ticket2.ticket_for_url, "QQ-%s" % ticket2.id)
|
|
self.assertEqual(ticket2.submitter_email, test_email_from_meta[1])
|
|
self.assertEqual(ticket2.title, test_email_subject)
|
|
self.assertEqual(ticket2.description, test_email_body)
|
|
|
|
def test_read_email_with_template_tag(self):
|
|
"""Tests reading plain text emails from a queue and creating tickets,
|
|
except this time the email body contains a Django template tag.
|
|
For each email source supported, we mock the backend to provide
|
|
authentically formatted responses containing our test data."""
|
|
# example email text from Django docs:
|
|
# https://docs.djangoproject.com/en/1.10/ref/unicode/
|
|
test_email_from = "Arnbjörg Ráðormsdóttir <arnbjorg@example.com>"
|
|
test_email_subject = "My visit to Sør-Trøndelag"
|
|
test_email_body = (
|
|
"Reporting some issue with the template tag: {% if helpdesk %}."
|
|
)
|
|
test_email = (
|
|
"To: helpdesk@example.com\nFrom: "
|
|
+ test_email_from
|
|
+ "\nSubject: "
|
|
+ test_email_subject
|
|
+ "\n\n"
|
|
+ test_email_body
|
|
)
|
|
test_mail_len = len(test_email)
|
|
|
|
if self.socks:
|
|
from socks import ProxyConnectionError
|
|
|
|
with self.assertRaisesRegex(
|
|
ProxyConnectionError, "%s:%s" % (unrouted_socks_server, unused_port)
|
|
):
|
|
call_command("get_email")
|
|
|
|
else:
|
|
# Test local email reading
|
|
if self.method == "local":
|
|
with (
|
|
mock.patch("os.listdir") as mocked_listdir,
|
|
mock.patch("helpdesk.email.isfile") as mocked_isfile,
|
|
mock.patch("builtins.open", mock.mock_open(read_data=test_email)),
|
|
mock.patch("os.unlink"),
|
|
):
|
|
mocked_isfile.return_value = True
|
|
mocked_listdir.return_value = ["filename1", "filename2"]
|
|
|
|
call_command("get_email")
|
|
|
|
mocked_listdir.assert_called_with("/var/lib/mail/helpdesk/")
|
|
mocked_isfile.assert_any_call("/var/lib/mail/helpdesk/filename1")
|
|
mocked_isfile.assert_any_call("/var/lib/mail/helpdesk/filename2")
|
|
|
|
elif self.method == "pop3":
|
|
# mock poplib.POP3's list and retr methods to provide responses
|
|
# as per RFC 1939
|
|
pop3_emails = {
|
|
"1": ("+OK", test_email.split("\n")),
|
|
"2": ("+OK", test_email.split("\n")),
|
|
}
|
|
pop3_mail_list = (
|
|
"+OK 2 messages",
|
|
("1 %d" % test_mail_len, "2 %d" % test_mail_len),
|
|
)
|
|
mocked_poplib_server = mock.Mock()
|
|
mocked_poplib_server.list = mock.Mock(return_value=pop3_mail_list)
|
|
mocked_poplib_server.retr = mock.Mock(
|
|
side_effect=lambda x: pop3_emails[x]
|
|
)
|
|
with mock.patch(
|
|
"helpdesk.email.poplib", autospec=True
|
|
) as mocked_poplib:
|
|
mocked_poplib.POP3 = mock.Mock(return_value=mocked_poplib_server)
|
|
call_command("get_email")
|
|
|
|
elif self.method == "imap":
|
|
# mock imaplib.IMAP4's search and fetch methods with responses
|
|
# from RFC 3501
|
|
imap_emails = {
|
|
"1": ("OK", (("1", test_email),)),
|
|
"2": ("OK", (("2", test_email),)),
|
|
}
|
|
imap_mail_list = ("OK", ("1 2",))
|
|
mocked_imaplib_server = mock.Mock()
|
|
mocked_imaplib_server.search = mock.Mock(return_value=imap_mail_list)
|
|
# we ignore the second arg as the data item/mime-part is
|
|
# constant (RFC822)
|
|
mocked_imaplib_server.fetch = mock.Mock(
|
|
side_effect=lambda x, _: imap_emails[x]
|
|
)
|
|
with mock.patch(
|
|
"helpdesk.email.imaplib", autospec=True
|
|
) as mocked_imaplib:
|
|
mocked_imaplib.IMAP4 = mock.Mock(return_value=mocked_imaplib_server)
|
|
call_command("get_email")
|
|
|
|
elif self.method == "oauth":
|
|
# mock the oauthlib session and requests oauth backendclient
|
|
# then mock imaplib.IMAP4's search and fetch methods with responses
|
|
# from RFC 3501
|
|
imap_emails = {
|
|
"1": ("OK", (("1", test_email),)),
|
|
"2": ("OK", (("2", test_email),)),
|
|
}
|
|
imap_mail_list = ("OK", ("1 2",))
|
|
mocked_imaplib_server = mock.Mock()
|
|
mocked_imaplib_server.search = mock.Mock(return_value=imap_mail_list)
|
|
# we ignore the second arg as the data item/mime-part is
|
|
# constant (RFC822)
|
|
mocked_imaplib_server.fetch = mock.Mock(
|
|
side_effect=lambda x, _: imap_emails[x]
|
|
)
|
|
|
|
mocked_oauth_backend_client = mock.Mock()
|
|
with mock.patch(
|
|
"helpdesk.email.oauth2lib", autospec=True
|
|
) as mocked_oauth2lib:
|
|
mocked_oauth2lib.BackendApplicationClient = mock.Mock(
|
|
return_value=mocked_oauth_backend_client
|
|
)
|
|
|
|
mocked_oauth_session = mock.Mock()
|
|
mocked_oauth_session.fetch_token = mock.Mock(return_value={})
|
|
|
|
with mock.patch(
|
|
"helpdesk.email.requests_oauthlib", autospec=True
|
|
) as mocked_requests_oauthlib:
|
|
mocked_requests_oauthlib.OAuth2Session = mock.Mock(
|
|
return_value=mocked_oauth_session
|
|
)
|
|
|
|
with mock.patch(
|
|
"helpdesk.email.imaplib", autospec=True
|
|
) as mocked_imaplib:
|
|
mocked_imaplib.IMAP4 = mock.Mock(
|
|
return_value=mocked_imaplib_server
|
|
)
|
|
|
|
call_command("get_email")
|
|
|
|
ticket1 = get_object_or_404(Ticket, pk=1)
|
|
self.assertEqual(ticket1.ticket_for_url, "QQ-%s" % ticket1.id)
|
|
self.assertEqual(ticket1.title, test_email_subject)
|
|
self.assertEqual(ticket1.description, test_email_body)
|
|
|
|
ticket2 = get_object_or_404(Ticket, pk=2)
|
|
self.assertEqual(ticket2.ticket_for_url, "QQ-%s" % ticket2.id)
|
|
self.assertEqual(ticket2.title, test_email_subject)
|
|
self.assertEqual(ticket2.description, test_email_body)
|
|
|
|
def test_read_html_multipart_email(self):
|
|
"""Tests reading multipart MIME (HTML body and plain text alternative)
|
|
emails from a queue and creating tickets.
|
|
For each email source supported, we mock the backend to provide
|
|
authentically formatted responses containing our test data."""
|
|
# example email text from Python docs:
|
|
# https://docs.python.org/3/library/email-examples.html
|
|
me = "my@example.com"
|
|
you = "your@example.com"
|
|
# NOTE: CC'd emails need to be alphabetical and tested as such!
|
|
# implementation uses sets, so only way to ensure tickets created
|
|
# in right order is to change set to list and sort it
|
|
cc_one = "nobody@example.com"
|
|
cc_two = "other@example.com"
|
|
cc = cc_one + ", " + cc_two
|
|
subject = "Link"
|
|
# Create message container - the correct MIME type is
|
|
# multipart/alternative.
|
|
msg = MIMEMultipart("alternative")
|
|
msg["Subject"] = subject
|
|
msg["From"] = me
|
|
msg["To"] = you
|
|
msg["Cc"] = cc
|
|
# Create the body of the message (a plain-text and an HTML version).
|
|
text = "Hi!\nHow are you?\nHere is the link you wanted:\nhttps://www.python.org"
|
|
html = """\
|
|
<html>
|
|
<head></head>
|
|
<body>
|
|
<p>Hi!<br>
|
|
How are you?<br>
|
|
Here is the <a href="https://www.python.org">link</a> you wanted.
|
|
</p>
|
|
</body>
|
|
</html>
|
|
"""
|
|
# Record the MIME types of both parts - text/plain and text/html.
|
|
part1 = MIMEText(text, "plain")
|
|
part2 = MIMEText(html, "html")
|
|
# Attach parts into message container.
|
|
# According to RFC 2046, the last part of a multipart message, in this case
|
|
# the HTML message, is best and preferred.
|
|
msg.attach(part1)
|
|
msg.attach(part2)
|
|
|
|
test_mail_len = len(msg)
|
|
|
|
if self.socks:
|
|
from socks import ProxyConnectionError
|
|
|
|
with self.assertRaisesRegex(
|
|
ProxyConnectionError, "%s:%s" % (unrouted_socks_server, unused_port)
|
|
):
|
|
call_command("get_email")
|
|
|
|
else:
|
|
# Test local email reading
|
|
if self.method == "local":
|
|
with (
|
|
mock.patch("os.listdir") as mocked_listdir,
|
|
mock.patch("helpdesk.email.isfile") as mocked_isfile,
|
|
mock.patch(
|
|
"builtins.open", mock.mock_open(read_data=msg.as_string())
|
|
),
|
|
mock.patch("os.unlink"),
|
|
):
|
|
mocked_isfile.return_value = True
|
|
mocked_listdir.return_value = ["filename1", "filename2"]
|
|
|
|
call_command("get_email")
|
|
|
|
mocked_listdir.assert_called_with("/var/lib/mail/helpdesk/")
|
|
mocked_isfile.assert_any_call("/var/lib/mail/helpdesk/filename1")
|
|
mocked_isfile.assert_any_call("/var/lib/mail/helpdesk/filename2")
|
|
|
|
elif self.method == "pop3":
|
|
# mock poplib.POP3's list and retr methods to provide responses
|
|
# as per RFC 1939
|
|
pop3_emails = {
|
|
"1": ("+OK", msg.as_string().split("\n")),
|
|
"2": ("+OK", msg.as_string().split("\n")),
|
|
}
|
|
pop3_mail_list = (
|
|
"+OK 2 messages",
|
|
("1 %d" % test_mail_len, "2 %d" % test_mail_len),
|
|
)
|
|
mocked_poplib_server = mock.Mock()
|
|
mocked_poplib_server.list = mock.Mock(return_value=pop3_mail_list)
|
|
mocked_poplib_server.retr = mock.Mock(
|
|
side_effect=lambda x: pop3_emails[x]
|
|
)
|
|
with mock.patch(
|
|
"helpdesk.email.poplib", autospec=True
|
|
) as mocked_poplib:
|
|
mocked_poplib.POP3 = mock.Mock(return_value=mocked_poplib_server)
|
|
call_command("get_email")
|
|
|
|
elif self.method == "imap":
|
|
# mock imaplib.IMAP4's search and fetch methods with responses
|
|
# from RFC 3501
|
|
imap_emails = {
|
|
"1": ("OK", (("1", msg.as_string()),)),
|
|
"2": ("OK", (("2", msg.as_string()),)),
|
|
}
|
|
imap_mail_list = ("OK", ("1 2",))
|
|
mocked_imaplib_server = mock.Mock()
|
|
mocked_imaplib_server.search = mock.Mock(return_value=imap_mail_list)
|
|
# we ignore the second arg as the data item/mime-part is
|
|
# constant (RFC822)
|
|
mocked_imaplib_server.fetch = mock.Mock(
|
|
side_effect=lambda x, _: imap_emails[x]
|
|
)
|
|
with mock.patch(
|
|
"helpdesk.email.imaplib", autospec=True
|
|
) as mocked_imaplib:
|
|
mocked_imaplib.IMAP4 = mock.Mock(return_value=mocked_imaplib_server)
|
|
call_command("get_email")
|
|
|
|
elif self.method == "oauth":
|
|
# mock the oauthlib session and requests oauth backendclient
|
|
# then mock imaplib.IMAP4's search and fetch methods with responses
|
|
# from RFC 3501
|
|
imap_emails = {
|
|
"1": ("OK", (("1", msg.as_string()),)),
|
|
"2": ("OK", (("2", msg.as_string()),)),
|
|
}
|
|
imap_mail_list = ("OK", ("1 2",))
|
|
mocked_imaplib_server = mock.Mock()
|
|
mocked_imaplib_server.search = mock.Mock(return_value=imap_mail_list)
|
|
# we ignore the second arg as the data item/mime-part is
|
|
# constant (RFC822)
|
|
mocked_imaplib_server.fetch = mock.Mock(
|
|
side_effect=lambda x, _: imap_emails[x]
|
|
)
|
|
|
|
mocked_oauth_backend_client = mock.Mock()
|
|
with mock.patch(
|
|
"helpdesk.email.oauth2lib", autospec=True
|
|
) as mocked_oauth2lib:
|
|
mocked_oauth2lib.BackendApplicationClient = mock.Mock(
|
|
return_value=mocked_oauth_backend_client
|
|
)
|
|
|
|
mocked_oauth_session = mock.Mock()
|
|
mocked_oauth_session.fetch_token = mock.Mock(return_value={})
|
|
|
|
with mock.patch(
|
|
"helpdesk.email.requests_oauthlib", autospec=True
|
|
) as mocked_requests_oauthlib:
|
|
mocked_requests_oauthlib.OAuth2Session = mock.Mock(
|
|
return_value=mocked_oauth_session
|
|
)
|
|
|
|
with mock.patch(
|
|
"helpdesk.email.imaplib", autospec=True
|
|
) as mocked_imaplib:
|
|
mocked_imaplib.IMAP4 = mock.Mock(
|
|
return_value=mocked_imaplib_server
|
|
)
|
|
|
|
call_command("get_email")
|
|
|
|
ticket1 = get_object_or_404(Ticket, pk=1)
|
|
self.assertEqual(ticket1.ticket_for_url, "QQ-%s" % ticket1.id)
|
|
self.assertEqual(ticket1.title, subject)
|
|
# plain text should become description
|
|
self.assertEqual(ticket1.description, text)
|
|
# HTML MIME part should be attached to follow up
|
|
followup1 = get_object_or_404(FollowUp, pk=1)
|
|
self.assertEqual(followup1.ticket.id, 1)
|
|
attach1 = get_object_or_404(FollowUpAttachment, pk=1)
|
|
self.assertEqual(attach1.followup.id, 1)
|
|
self.assertEqual(attach1.filename, "email_html_body.html")
|
|
cc0 = get_object_or_404(TicketCC, pk=1)
|
|
self.assertEqual(cc0.email, you)
|
|
cc1 = get_object_or_404(TicketCC, pk=2)
|
|
self.assertEqual(cc1.email, cc_one)
|
|
cc2 = get_object_or_404(TicketCC, pk=3)
|
|
self.assertEqual(cc2.email, cc_two)
|
|
self.assertEqual(len(TicketCC.objects.filter(ticket=1)), 3)
|
|
|
|
ticket2 = get_object_or_404(Ticket, pk=2)
|
|
self.assertEqual(ticket2.ticket_for_url, "QQ-%s" % ticket2.id)
|
|
self.assertEqual(ticket2.title, subject)
|
|
# plain text should become description
|
|
self.assertEqual(ticket2.description, text)
|
|
# HTML MIME part should be attached to follow up
|
|
followup2 = get_object_or_404(FollowUp, pk=2)
|
|
self.assertEqual(followup2.ticket.id, 2)
|
|
attach2 = get_object_or_404(FollowUpAttachment, pk=2)
|
|
self.assertEqual(attach2.followup.id, 2)
|
|
self.assertEqual(attach2.filename, "email_html_body.html")
|
|
|
|
def test_read_pgp_signed_email(self):
|
|
"""Tests reading a PGP signed email to ensure we handle base64
|
|
and PGP signatures appropriately."""
|
|
# example email text from #567 on GitHub
|
|
with open(os.path.join(THIS_DIR, "test_files/pgp.eml"), encoding="utf-8") as fd:
|
|
test_email = fd.read()
|
|
test_mail_len = len(test_email)
|
|
|
|
if self.socks:
|
|
from socks import ProxyConnectionError
|
|
|
|
with self.assertRaisesRegex(
|
|
ProxyConnectionError, "%s:%s" % (unrouted_socks_server, unused_port)
|
|
):
|
|
call_command("get_email")
|
|
|
|
else:
|
|
# Test local email reading
|
|
if self.method == "local":
|
|
with (
|
|
mock.patch("os.listdir") as mocked_listdir,
|
|
mock.patch("helpdesk.email.isfile") as mocked_isfile,
|
|
mock.patch("builtins.open", mock.mock_open(read_data=test_email)),
|
|
mock.patch("os.unlink"),
|
|
):
|
|
mocked_isfile.return_value = True
|
|
mocked_listdir.return_value = ["filename1"]
|
|
|
|
call_command("get_email")
|
|
|
|
mocked_listdir.assert_called_with("/var/lib/mail/helpdesk/")
|
|
mocked_isfile.assert_any_call("/var/lib/mail/helpdesk/filename1")
|
|
|
|
elif self.method == "pop3":
|
|
# mock poplib.POP3's list and retr methods to provide responses
|
|
# as per RFC 1939
|
|
pop3_emails = {
|
|
"1": ("+OK", test_email.split("\n")),
|
|
}
|
|
pop3_mail_list = ("+OK 1 message", ("1 %d" % test_mail_len))
|
|
mocked_poplib_server = mock.Mock()
|
|
mocked_poplib_server.list = mock.Mock(return_value=pop3_mail_list)
|
|
mocked_poplib_server.retr = mock.Mock(
|
|
side_effect=lambda x: pop3_emails["1"]
|
|
)
|
|
with mock.patch(
|
|
"helpdesk.email.poplib", autospec=True
|
|
) as mocked_poplib:
|
|
mocked_poplib.POP3 = mock.Mock(return_value=mocked_poplib_server)
|
|
call_command("get_email")
|
|
|
|
elif self.method == "imap":
|
|
# mock imaplib.IMAP4's search and fetch methods with responses
|
|
# from RFC 3501
|
|
imap_emails = {
|
|
"1": ("OK", (("1", test_email),)),
|
|
}
|
|
imap_mail_list = ("OK", ("1",))
|
|
mocked_imaplib_server = mock.Mock()
|
|
mocked_imaplib_server.search = mock.Mock(return_value=imap_mail_list)
|
|
# we ignore the second arg as the data item/mime-part is
|
|
# constant (RFC822)
|
|
mocked_imaplib_server.fetch = mock.Mock(
|
|
side_effect=lambda x, _: imap_emails[x]
|
|
)
|
|
with mock.patch(
|
|
"helpdesk.email.imaplib", autospec=True
|
|
) as mocked_imaplib:
|
|
mocked_imaplib.IMAP4 = mock.Mock(return_value=mocked_imaplib_server)
|
|
call_command("get_email")
|
|
|
|
elif self.method == "oauth":
|
|
# mock the oauthlib session and requests oauth backendclient
|
|
# then mock imaplib.IMAP4's search and fetch methods with responses
|
|
# from RFC 3501
|
|
imap_emails = {
|
|
"1": ("OK", (("1", test_email),)),
|
|
}
|
|
imap_mail_list = ("OK", ("1",))
|
|
mocked_imaplib_server = mock.Mock()
|
|
mocked_imaplib_server.search = mock.Mock(return_value=imap_mail_list)
|
|
# we ignore the second arg as the data item/mime-part is
|
|
# constant (RFC822)
|
|
mocked_imaplib_server.fetch = mock.Mock(
|
|
side_effect=lambda x, _: imap_emails[x]
|
|
)
|
|
|
|
mocked_oauth_backend_client = mock.Mock()
|
|
with mock.patch(
|
|
"helpdesk.email.oauth2lib", autospec=True
|
|
) as mocked_oauth2lib:
|
|
mocked_oauth2lib.BackendApplicationClient = mock.Mock(
|
|
return_value=mocked_oauth_backend_client
|
|
)
|
|
|
|
mocked_oauth_session = mock.Mock()
|
|
mocked_oauth_session.fetch_token = mock.Mock(return_value={})
|
|
|
|
with mock.patch(
|
|
"helpdesk.email.requests_oauthlib", autospec=True
|
|
) as mocked_requests_oauthlib:
|
|
mocked_requests_oauthlib.OAuth2Session = mock.Mock(
|
|
return_value=mocked_oauth_session
|
|
)
|
|
|
|
with mock.patch(
|
|
"helpdesk.email.imaplib", autospec=True
|
|
) as mocked_imaplib:
|
|
mocked_imaplib.IMAP4 = mock.Mock(
|
|
return_value=mocked_imaplib_server
|
|
)
|
|
|
|
call_command("get_email")
|
|
|
|
ticket1 = get_object_or_404(Ticket, pk=1)
|
|
self.assertEqual(ticket1.ticket_for_url, "QQ-%s" % ticket1.id)
|
|
self.assertEqual(
|
|
ticket1.title, "example email that crashes django-helpdesk get_email"
|
|
)
|
|
self.assertEqual(
|
|
ticket1.description,
|
|
"""hi, thanks for looking into this :)\n\nhttps://github.com/django-helpdesk/django-helpdesk/issues/567#issuecomment-342954233""",
|
|
)
|
|
# MIME part should be attached to follow up
|
|
followup1 = get_object_or_404(FollowUp, pk=1)
|
|
self.assertEqual(followup1.ticket.id, 1)
|
|
attach1 = get_object_or_404(FollowUpAttachment, pk=1)
|
|
self.assertEqual(attach1.followup.id, 1)
|
|
self.assertEqual(attach1.filename, "part-1_signature.asc")
|
|
self.assertEqual(
|
|
attach1.file.read(),
|
|
b"""-----BEGIN PGP SIGNATURE-----
|
|
|
|
iQIcBAEBCAAGBQJaA3dnAAoJELBLc7QPITnLN54P/3Zsu7+AIQWDFTvziJfCqswG
|
|
u99fG+iWa6ER+iuZG0YU1BdIxIjSKt1pvqB0yXITlT9FCdf1zc0pmeJ08I0a5pVa
|
|
iaym5prVUro5BNQ6Vqoo0jvOCKNrACtFNv85zDzXbPNP8TrUss41U+ackPHkOHov
|
|
cmJ5YZFQebYXXpibFSIDimVGfwI57vyTWvolttZFLSI1mgGX7MvHaKh253QLdXIo
|
|
EUih40rOw3f/nYPEKyW8QA72ImBsZdcZI5buiiCC1bgMkKSFSNAFiIanYEpGNMnO
|
|
3zYKBpbpBhnWSi5orwx47/v4/Yb/qVr5ppuV23+YoMfEGT8cHPTAdYpnpE27ByAv
|
|
jvpxKEwmkUzD1WxOmQdCcPJPyWz1OBUVvjj0nn0Espnz8V8esl9+IFs739lpFBHu
|
|
fWWA315LTmIJMGH5Ujf4myiQeXDo6Gsy6WhE13q7MKTq3tnyi5dJG9GJCBf646dL
|
|
RwcDf9O7MvKSV2kSPmryLnUF7D+2fva+Cy+CvJDVJCo5zr4ucXPXZ4htpI6Pjpd5
|
|
oPHvbqxSCMJrQ7eAFTYmBNGauSyr0XvGM1qmHBZD/laQEJHYgLT2ILrymZhVDHtK
|
|
W7tXhGjMoUvqAxiKkmG3UHFqN4k3EYo13PwoOWyJHD1M9ArbX/Sk9l8DDguCh3DW
|
|
a9eiiQ+3V1v+7wWHXCzq
|
|
=6JeP
|
|
-----END PGP SIGNATURE-----
|
|
""",
|
|
)
|
|
# should this be 'application/pgp-signature'?
|
|
# self.assertEqual(attach1.mime_type, 'text/plain')
|
|
|
|
|
|
class GetEmailCCHandling(TestCase):
|
|
"""TestCase that checks CC handling in email. Needs its own test harness."""
|
|
|
|
def setUp(self):
|
|
self.temp_logdir = mkdtemp()
|
|
|
|
kwargs = {
|
|
"title": "CC Queue",
|
|
"slug": "CC",
|
|
"allow_public_submission": True,
|
|
"allow_email_submission": True,
|
|
"email_address": "queue@example.com",
|
|
"email_box_type": "local",
|
|
"email_box_local_dir": "/var/lib/mail/helpdesk/",
|
|
"logging_dir": self.temp_logdir,
|
|
"logging_type": "none",
|
|
}
|
|
self.queue_public = Queue.objects.create(**kwargs)
|
|
|
|
user1_kwargs = {
|
|
"username": "staff",
|
|
"email": "staff@example.com",
|
|
"password": make_password("Test1234"),
|
|
"is_staff": True,
|
|
"is_superuser": False,
|
|
"is_active": True,
|
|
}
|
|
self.staff_user = User.objects.create(**user1_kwargs)
|
|
|
|
user2_kwargs = {
|
|
"username": "assigned",
|
|
"email": "assigned@example.com",
|
|
"password": make_password("Test1234"),
|
|
"is_staff": True,
|
|
"is_superuser": False,
|
|
"is_active": True,
|
|
}
|
|
self.assigned_user = User.objects.create(**user2_kwargs)
|
|
|
|
user3_kwargs = {
|
|
"username": "observer",
|
|
"email": "observer@example.com",
|
|
"password": make_password("Test1234"),
|
|
"is_staff": True,
|
|
"is_superuser": False,
|
|
"is_active": True,
|
|
}
|
|
self.observer_user = User.objects.create(**user3_kwargs)
|
|
|
|
ticket_kwargs = {
|
|
"title": "Original Ticket",
|
|
"queue": self.queue_public,
|
|
"submitter_email": "submitter@example.com",
|
|
"assigned_to": self.assigned_user,
|
|
"status": 1,
|
|
}
|
|
self.original_ticket = Ticket.objects.create(**ticket_kwargs)
|
|
|
|
cc_kwargs = {
|
|
"ticket": self.original_ticket,
|
|
"user": self.staff_user,
|
|
"can_view": True,
|
|
"can_update": True,
|
|
}
|
|
self.original_cc = TicketCC.objects.create(**cc_kwargs)
|
|
|
|
def tearDown(self):
|
|
rmtree(self.temp_logdir)
|
|
|
|
def test_read_email_cc(self):
|
|
"""Tests reading plain text emails from a queue and adding to a ticket,
|
|
particularly to test appropriate handling of CC'd emails."""
|
|
# first, check that test ticket exists
|
|
ticket1 = get_object_or_404(Ticket, pk=1)
|
|
self.assertEqual(ticket1.ticket_for_url, "CC-1")
|
|
self.assertEqual(ticket1.title, "Original Ticket")
|
|
# only the staff_user is CC'd for now
|
|
self.assertEqual(len(TicketCC.objects.filter(ticket=1)), 1)
|
|
ccstaff = get_object_or_404(TicketCC, pk=1)
|
|
self.assertEqual(ccstaff.user, User.objects.get(username="staff"))
|
|
self.assertEqual(ticket1.assigned_to, User.objects.get(username="assigned"))
|
|
# example email text from Django docs:
|
|
# https://docs.djangoproject.com/en/1.10/ref/unicode/
|
|
test_email_from = "submitter@example.com"
|
|
# NOTE: CC emails are in alphabetical order and must be tested as such!
|
|
# implementation uses sets, so only way to ensure tickets created
|
|
# in right order is to change set to list and sort it
|
|
test_email_cc_one = "Alice Ráðormsdóttir <alice@example.com>"
|
|
test_email_cc_two = "nobody@example.com"
|
|
test_email_cc_three = "other@example.com"
|
|
test_email_cc_four = "someone@example.com"
|
|
ticket_user_emails = "assigned@example.com, staff@example.com, submitter@example.com, observer@example.com, queue@example.com"
|
|
test_email_subject = "[CC-1] My visit to Sør-Trøndelag"
|
|
test_email_body = "Unicode helpdesk comment with an s-hat (ŝ) via email."
|
|
test_email = (
|
|
"To: queue@example.com\nCc: "
|
|
+ test_email_cc_one
|
|
+ ", "
|
|
+ test_email_cc_one
|
|
+ ", "
|
|
+ test_email_cc_two
|
|
+ ", "
|
|
+ test_email_cc_three
|
|
+ "\nCC: "
|
|
+ test_email_cc_one
|
|
+ ", "
|
|
+ test_email_cc_three
|
|
+ ", "
|
|
+ test_email_cc_four
|
|
+ ", "
|
|
+ ticket_user_emails
|
|
+ "\nFrom: "
|
|
+ test_email_from
|
|
+ "\nSubject: "
|
|
+ test_email_subject
|
|
+ "\n\n"
|
|
+ test_email_body
|
|
)
|
|
|
|
with (
|
|
mock.patch("os.listdir") as mocked_listdir,
|
|
mock.patch("helpdesk.email.isfile") as mocked_isfile,
|
|
mock.patch("builtins.open", mock.mock_open(read_data=test_email)),
|
|
mock.patch("os.unlink"),
|
|
):
|
|
mocked_isfile.return_value = True
|
|
mocked_listdir.return_value = ["filename1"]
|
|
|
|
call_command("get_email")
|
|
|
|
mocked_listdir.assert_called_with("/var/lib/mail/helpdesk/")
|
|
mocked_isfile.assert_any_call("/var/lib/mail/helpdesk/filename1")
|
|
# 9 unique email addresses are CC'd when all is done
|
|
self.assertEqual(len(TicketCC.objects.filter(ticket=1)), 9)
|
|
# next we make sure no duplicates were added, and the
|
|
# staff users nor submitter were not re-added as email TicketCCs
|
|
cc1 = get_object_or_404(TicketCC, pk=1)
|
|
self.assertEqual(cc1.user, User.objects.get(username="staff"))
|
|
cc2 = get_object_or_404(TicketCC, pk=2)
|
|
self.assertEqual(cc2.email, "alice@example.com")
|
|
cc3 = get_object_or_404(TicketCC, pk=3)
|
|
self.assertEqual(cc3.email, test_email_cc_two)
|
|
cc4 = get_object_or_404(TicketCC, pk=4)
|
|
self.assertEqual(cc4.email, test_email_cc_three)
|
|
cc5 = get_object_or_404(TicketCC, pk=5)
|
|
self.assertEqual(cc5.email, test_email_cc_four)
|
|
cc6 = get_object_or_404(TicketCC, pk=6)
|
|
self.assertEqual(cc6.email, "assigned@example.com")
|
|
cc7 = get_object_or_404(TicketCC, pk=7)
|
|
self.assertEqual(cc7.email, "staff@example.com")
|
|
cc8 = get_object_or_404(TicketCC, pk=8)
|
|
self.assertEqual(cc8.email, "submitter@example.com")
|
|
cc9 = get_object_or_404(TicketCC, pk=9)
|
|
self.assertEqual(cc9.user, User.objects.get(username="observer"))
|
|
self.assertEqual(cc9.email, "observer@example.com")
|
|
|
|
|
|
# build matrix of test cases
|
|
case_methods = [c[0] for c in Queue._meta.get_field("email_box_type").choices]
|
|
# uncomment if you want to run tests with socks - which is much slover
|
|
# case_socks = [False] + [c[0] for c in Queue._meta.get_field('socks_proxy_type').choices]
|
|
case_socks = [False]
|
|
case_matrix = list(itertools.product(case_methods, case_socks))
|
|
# Populate TestCases from the matrix of parameters
|
|
thismodule = sys.modules[__name__]
|
|
for method, socks in case_matrix:
|
|
if method == "local" and socks:
|
|
continue
|
|
|
|
socks_str = "Nosocks"
|
|
if socks:
|
|
socks_str = socks.capitalize()
|
|
test_name = str("TestGetEmail%s%s" % (method.capitalize(), socks_str))
|
|
|
|
cl = type(
|
|
test_name,
|
|
(GetEmailParametricTemplate, TestCase),
|
|
{"method": method, "socks": socks},
|
|
)
|
|
setattr(thismodule, test_name, cl)
|