mirror of
https://github.com/django-helpdesk/django-helpdesk.git
synced 2024-12-13 10:21:05 +01:00
Merge branch 'main' into feature/issue-826-dockerize
This commit is contained in:
commit
b9997e2ad0
11
README.rst
11
README.rst
@ -57,7 +57,7 @@ Installation
|
|||||||
`django-helpdesk` requires:
|
`django-helpdesk` requires:
|
||||||
|
|
||||||
* Python 3.8+
|
* Python 3.8+
|
||||||
* Django 3.2 LTS highly recommended (early adopters may test Django 4)
|
* Django 3.2 LTS or Django 4.*
|
||||||
|
|
||||||
You can quickly install the latest stable version of `django-helpdesk`
|
You can quickly install the latest stable version of `django-helpdesk`
|
||||||
app via `pip`::
|
app via `pip`::
|
||||||
@ -78,6 +78,9 @@ Developer Environment
|
|||||||
---------------------
|
---------------------
|
||||||
|
|
||||||
Follow these steps to set up your development environment to contribute to helpdesk:
|
Follow these steps to set up your development environment to contribute to helpdesk:
|
||||||
|
- check out the helpdesk app to your local file system::
|
||||||
|
git clone https://github.com/django-helpdesk/django-helpdesk.git
|
||||||
|
|
||||||
- install a virtual environment
|
- install a virtual environment
|
||||||
- using virtualenv from the helpdesk base folder do::
|
- using virtualenv from the helpdesk base folder do::
|
||||||
virtualenv .venv && source .venv/bin/activate
|
virtualenv .venv && source .venv/bin/activate
|
||||||
@ -85,6 +88,12 @@ Follow these steps to set up your development environment to contribute to helpd
|
|||||||
- install the requirements for development::
|
- install the requirements for development::
|
||||||
pip install -r requirements.txt -r requirements-dev.txt
|
pip install -r requirements.txt -r requirements-dev.txt
|
||||||
|
|
||||||
|
- install the requirements for testing as well::
|
||||||
|
pip install -r requirements.txt -r requirements-dev.txt -r requirements-testing.txt
|
||||||
|
|
||||||
|
To reactivate a VENV just run:
|
||||||
|
source .venv/bin/activate
|
||||||
|
|
||||||
To see option for the Makefile run: `make`
|
To see option for the Makefile run: `make`
|
||||||
|
|
||||||
The project enforces a standardized formatting in the CI/CD pipeline. To ensure you have the correct formatting run::
|
The project enforces a standardized formatting in the CI/CD pipeline. To ensure you have the correct formatting run::
|
||||||
|
@ -37,7 +37,6 @@ If you want to override the default settings for your users, create ``HELPDESK_D
|
|||||||
'tickets_per_page': 25
|
'tickets_per_page': 25
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
Generic Options
|
Generic Options
|
||||||
---------------
|
---------------
|
||||||
These changes are visible throughout django-helpdesk
|
These changes are visible throughout django-helpdesk
|
||||||
@ -86,6 +85,10 @@ These changes are visible throughout django-helpdesk
|
|||||||
|
|
||||||
**Default:** ``HELPDESK_MAX_EMAIL_ATTACHMENT_SIZE = 512000``
|
**Default:** ``HELPDESK_MAX_EMAIL_ATTACHMENT_SIZE = 512000``
|
||||||
|
|
||||||
|
- **VALID_EXTENSIONS** Valid extensions for file types that can be attached to tickets
|
||||||
|
|
||||||
|
**Default:** ``VALID_EXTENSIONS = ['.txt', '.asc', '.htm', '.html', '.pdf', '.doc', '.docx', '.odt', '.jpg', '.png', '.eml']
|
||||||
|
|
||||||
- **QUEUE_EMAIL_BOX_UPDATE_ONLY** Only process mail with a valid tracking ID; all other mail will be ignored instead of creating a new ticket.
|
- **QUEUE_EMAIL_BOX_UPDATE_ONLY** Only process mail with a valid tracking ID; all other mail will be ignored instead of creating a new ticket.
|
||||||
|
|
||||||
**Default:** ``QUEUE_EMAIL_BOX_UPDATE_ONLY = False``
|
**Default:** ``QUEUE_EMAIL_BOX_UPDATE_ONLY = False``
|
||||||
|
@ -6,8 +6,6 @@ See LICENSE for details.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
# import base64
|
# import base64
|
||||||
|
|
||||||
|
|
||||||
from bs4 import BeautifulSoup
|
from bs4 import BeautifulSoup
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from django.conf import settings as django_settings
|
from django.conf import settings as django_settings
|
||||||
@ -18,7 +16,8 @@ from django.db.models import Q
|
|||||||
from django.utils import encoding, timezone
|
from django.utils import encoding, timezone
|
||||||
from django.utils.translation import gettext as _
|
from django.utils.translation import gettext as _
|
||||||
import email
|
import email
|
||||||
from email.message import Message
|
from email import policy
|
||||||
|
from email.message import EmailMessage, MIMEPart
|
||||||
from email.utils import getaddresses
|
from email.utils import getaddresses
|
||||||
from email_reply_parser import EmailReplyParser
|
from email_reply_parser import EmailReplyParser
|
||||||
from helpdesk import settings
|
from helpdesk import settings
|
||||||
@ -39,7 +38,7 @@ import ssl
|
|||||||
import sys
|
import sys
|
||||||
from time import ctime
|
from time import ctime
|
||||||
import typing
|
import typing
|
||||||
from typing import List, Tuple
|
from typing import List
|
||||||
|
|
||||||
|
|
||||||
# import User model, which may be a custom model
|
# import User model, which may be a custom model
|
||||||
@ -53,6 +52,9 @@ STRIPPED_SUBJECT_STRINGS = [
|
|||||||
"Automatic reply: ",
|
"Automatic reply: ",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# Allow a custom default attached email name for the HTML formatted email if one is found
|
||||||
|
HTML_EMAIL_ATTACHMENT_FILENAME = _("email_html_body.html")
|
||||||
|
|
||||||
|
|
||||||
def process_email(quiet=False):
|
def process_email(quiet=False):
|
||||||
for q in Queue.objects.filter(
|
for q in Queue.objects.filter(
|
||||||
@ -75,7 +77,6 @@ def process_email(quiet=False):
|
|||||||
logger.propagate = False
|
logger.propagate = False
|
||||||
if quiet:
|
if quiet:
|
||||||
logger.propagate = False # do not propagate to root logger that would log to console
|
logger.propagate = False # do not propagate to root logger that would log to console
|
||||||
|
|
||||||
# Log messages to specific file only if the queue has it configured
|
# Log messages to specific file only if the queue has it configured
|
||||||
if (q.logging_type in logging_types) and q.logging_dir: # if it's enabled and the dir is set
|
if (q.logging_type in logging_types) and q.logging_dir: # if it's enabled and the dir is set
|
||||||
log_file_handler = logging.FileHandler(
|
log_file_handler = logging.FileHandler(
|
||||||
@ -141,7 +142,7 @@ def pop3_sync(q, logger, server):
|
|||||||
full_message = encoding.force_str(
|
full_message = encoding.force_str(
|
||||||
"\n".join(raw_content), errors='replace')
|
"\n".join(raw_content), errors='replace')
|
||||||
try:
|
try:
|
||||||
ticket = object_from_message(message=full_message, queue=q, logger=logger)
|
ticket = extract_email_metadata(message=full_message, queue=q, logger=logger)
|
||||||
except IgnoreTicketException:
|
except IgnoreTicketException:
|
||||||
logger.warn(
|
logger.warn(
|
||||||
"Message %s was ignored and will be left on POP3 server" % msgNum)
|
"Message %s was ignored and will be left on POP3 server" % msgNum)
|
||||||
@ -198,7 +199,7 @@ def imap_sync(q, logger, server):
|
|||||||
data = server.fetch(num, '(RFC822)')[1]
|
data = server.fetch(num, '(RFC822)')[1]
|
||||||
full_message = encoding.force_str(data[0][1], errors='replace')
|
full_message = encoding.force_str(data[0][1], errors='replace')
|
||||||
try:
|
try:
|
||||||
ticket = object_from_message(message=full_message, queue=q, logger=logger)
|
ticket = extract_email_metadata(message=full_message, queue=q, logger=logger)
|
||||||
except IgnoreTicketException:
|
except IgnoreTicketException:
|
||||||
logger.warn("Message %s was ignored and will be left on IMAP server" % num)
|
logger.warn("Message %s was ignored and will be left on IMAP server" % num)
|
||||||
except DeleteIgnoredTicketException:
|
except DeleteIgnoredTicketException:
|
||||||
@ -252,13 +253,11 @@ def imap_oauth_sync(q, logger, server):
|
|||||||
)
|
)
|
||||||
|
|
||||||
server.debug = settings.HELPDESK_IMAP_DEBUG_LEVEL
|
server.debug = settings.HELPDESK_IMAP_DEBUG_LEVEL
|
||||||
|
|
||||||
# TODO: Perhaps store the authentication string template externally? Settings? Queue Table?
|
# TODO: Perhaps store the authentication string template externally? Settings? Queue Table?
|
||||||
server.authenticate(
|
server.authenticate(
|
||||||
"XOAUTH2",
|
"XOAUTH2",
|
||||||
lambda x: f"user={q.email_box_user}\x01auth=Bearer {token['access_token']}\x01\x01".encode(),
|
lambda x: f"user={q.email_box_user}\x01auth=Bearer {token['access_token']}\x01\x01".encode(),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Select the Inbound Mailbox folder
|
# Select the Inbound Mailbox folder
|
||||||
server.select(q.email_box_imap_folder)
|
server.select(q.email_box_imap_folder)
|
||||||
|
|
||||||
@ -285,7 +284,7 @@ def imap_oauth_sync(q, logger, server):
|
|||||||
full_message = encoding.force_str(data[0][1], errors='replace')
|
full_message = encoding.force_str(data[0][1], errors='replace')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
ticket = object_from_message(message=full_message, queue=q, logger=logger)
|
ticket = extract_email_metadata(message=full_message, queue=q, logger=logger)
|
||||||
|
|
||||||
except IgnoreTicketException as itex:
|
except IgnoreTicketException as itex:
|
||||||
logger.warn(f"Message {num} was ignored. {itex}")
|
logger.warn(f"Message {num} was ignored. {itex}")
|
||||||
@ -312,7 +311,6 @@ def imap_oauth_sync(q, logger, server):
|
|||||||
"IMAP retrieve failed. Is the folder '%s' spelled correctly, and does it exist on the server?",
|
"IMAP retrieve failed. Is the folder '%s' spelled correctly, and does it exist on the server?",
|
||||||
q.email_box_imap_folder
|
q.email_box_imap_folder
|
||||||
)
|
)
|
||||||
|
|
||||||
# Purged Flagged Messages & Logout
|
# Purged Flagged Messages & Logout
|
||||||
server.expunge()
|
server.expunge()
|
||||||
server.close()
|
server.close()
|
||||||
@ -405,7 +403,7 @@ def process_queue(q, logger):
|
|||||||
with open(m, 'r') as f:
|
with open(m, 'r') as f:
|
||||||
full_message = encoding.force_str(f.read(), errors='replace')
|
full_message = encoding.force_str(f.read(), errors='replace')
|
||||||
try:
|
try:
|
||||||
ticket = object_from_message(message=full_message, queue=q, logger=logger)
|
ticket = extract_email_metadata(message=full_message, queue=q, logger=logger)
|
||||||
except IgnoreTicketException:
|
except IgnoreTicketException:
|
||||||
logger.warn("Message %d was ignored and will be left in local directory", i)
|
logger.warn("Message %d was ignored and will be left in local directory", i)
|
||||||
except DeleteIgnoredTicketException:
|
except DeleteIgnoredTicketException:
|
||||||
@ -429,7 +427,7 @@ def process_queue(q, logger):
|
|||||||
|
|
||||||
|
|
||||||
def decodeUnknown(charset, string):
|
def decodeUnknown(charset, string):
|
||||||
if type(string) is not str:
|
if string and not isinstance(string, str):
|
||||||
if not charset:
|
if not charset:
|
||||||
try:
|
try:
|
||||||
return str(string, encoding='utf-8', errors='replace')
|
return str(string, encoding='utf-8', errors='replace')
|
||||||
@ -468,11 +466,10 @@ def is_autoreply(message):
|
|||||||
def create_ticket_cc(ticket, cc_list):
|
def create_ticket_cc(ticket, cc_list):
|
||||||
if not cc_list:
|
if not cc_list:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Local import to deal with non-defined / circular reference problem
|
# Local import to deal with non-defined / circular reference problem
|
||||||
from helpdesk.views.staff import subscribe_to_ticket_updates, User
|
|
||||||
|
|
||||||
new_ticket_ccs = []
|
new_ticket_ccs = []
|
||||||
|
from helpdesk.views.staff import subscribe_to_ticket_updates, User
|
||||||
for __, cced_email in cc_list:
|
for __, cced_email in cc_list:
|
||||||
|
|
||||||
cced_email = cced_email.strip()
|
cced_email = cced_email.strip()
|
||||||
@ -538,7 +535,6 @@ def create_object_from_email_message(message, ticket_id, payload, files, logger)
|
|||||||
ticket.merged_to.ticket)
|
ticket.merged_to.ticket)
|
||||||
# Use the ticket in which it was merged to for next operations
|
# Use the ticket in which it was merged to for next operations
|
||||||
ticket = ticket.merged_to
|
ticket = ticket.merged_to
|
||||||
|
|
||||||
# New issue, create a new <Ticket> instance
|
# New issue, create a new <Ticket> instance
|
||||||
if ticket is None:
|
if ticket is None:
|
||||||
if not settings.QUEUE_EMAIL_BOX_UPDATE_ONLY:
|
if not settings.QUEUE_EMAIL_BOX_UPDATE_ONLY:
|
||||||
@ -555,7 +551,6 @@ def create_object_from_email_message(message, ticket_id, payload, files, logger)
|
|||||||
(ticket.queue.slug, ticket.id))
|
(ticket.queue.slug, ticket.id))
|
||||||
|
|
||||||
new = True
|
new = True
|
||||||
|
|
||||||
# Old issue being re-opened
|
# Old issue being re-opened
|
||||||
elif ticket.status == Ticket.CLOSED_STATUS:
|
elif ticket.status == Ticket.CLOSED_STATUS:
|
||||||
ticket.status = Ticket.REOPENED_STATUS
|
ticket.status = Ticket.REOPENED_STATUS
|
||||||
@ -723,96 +718,222 @@ def attempt_body_extract_from_html(message: str) -> str:
|
|||||||
return body, full_body
|
return body, full_body
|
||||||
|
|
||||||
|
|
||||||
def extract_part_data(
|
def mime_content_to_string(part: EmailMessage,) -> str:
|
||||||
part: Message,
|
'''
|
||||||
|
Extract the content from the MIME body part
|
||||||
|
:param part: the MIME part to extract the content from
|
||||||
|
'''
|
||||||
|
content_bytes = part.get_payload(decode=True)
|
||||||
|
charset = part.get_content_charset()
|
||||||
|
# The default for MIME email is 7bit which requires special decoding to utf-8 so make sure
|
||||||
|
# we handle the decoding correctly
|
||||||
|
if part['Content-Transfer-Encoding'] in [None, '8bit', '7bit'] and (charset == 'utf-8' or charset is None):
|
||||||
|
charset = "unicode_escape"
|
||||||
|
content = decodeUnknown(charset, content_bytes)
|
||||||
|
return content
|
||||||
|
|
||||||
|
|
||||||
|
def parse_email_content(mime_content: str, is_extract_full_email_msg: bool) -> str:
|
||||||
|
if is_extract_full_email_msg:
|
||||||
|
# Take the full content including encapsulated "forwarded" and "reply" sections
|
||||||
|
return mime_content
|
||||||
|
else:
|
||||||
|
# Just get the primary part of the email and drop off any text below the actual response text
|
||||||
|
return EmailReplyParser.parse_reply(mime_content)
|
||||||
|
|
||||||
|
|
||||||
|
def extract_email_message_content(
|
||||||
|
part: MIMEPart,
|
||||||
|
files: List,
|
||||||
|
include_chained_msgs: bool,
|
||||||
|
) -> (str, str):
|
||||||
|
'''
|
||||||
|
Uses the get_body() method of the email package to extract the email message content.
|
||||||
|
If there is an HTML version of the email message content then it is stored as an attachment.
|
||||||
|
If there is a plain text part then that is used for storing the email content aginst the ticket.
|
||||||
|
Otherwise if there is just an HTML part then the HTML is parsed to extract a simple text message.
|
||||||
|
There is special handling for the case when a multipart/related part holds the message content when
|
||||||
|
there are multiple attachments to the email.
|
||||||
|
:param part: the base email MIME part to be searched
|
||||||
|
:param files: any MIME parts to be attached are added to this list
|
||||||
|
:param include_chained_msgs: flag to indicate if the entire email message content including past
|
||||||
|
replies must be extracted
|
||||||
|
'''
|
||||||
|
message_part: MIMEPart = part.get_body()
|
||||||
|
parent_part: MIMEPart = part
|
||||||
|
content_type = message_part.get_content_type()
|
||||||
|
# Handle the possibility of a related part formatted email
|
||||||
|
if "multipart/related" == content_type:
|
||||||
|
# We want the actual message text so try again on the related MIME part
|
||||||
|
parent_part = message_part
|
||||||
|
message_part = message_part.get_body(preferencelist=["html", "plain",])
|
||||||
|
content_type = message_part.get_content_type()
|
||||||
|
mime_content = None
|
||||||
|
formatted_body = None # Retain the original content by using a secondary variable if the HTML needs wrapping
|
||||||
|
if "text/html" == content_type:
|
||||||
|
# add the HTML message as an attachment wrapping if necessary
|
||||||
|
mime_content = mime_content_to_string(message_part)
|
||||||
|
if "<body" not in mime_content:
|
||||||
|
formatted_body = f"<body>{mime_content}</body>"
|
||||||
|
if "<html" not in mime_content:
|
||||||
|
formatted_body = f"<html><head><meta charset=\"utf-8\" /></head>\
|
||||||
|
{mime_content if formatted_body is None else formatted_body}</html>"
|
||||||
|
files.append(
|
||||||
|
SimpleUploadedFile(
|
||||||
|
HTML_EMAIL_ATTACHMENT_FILENAME,
|
||||||
|
(mime_content if formatted_body is None else formatted_body).encode("utf-8"), 'text/html',
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# Try to get a plain part message
|
||||||
|
plain_message_part = parent_part.get_body(preferencelist=["plain",])
|
||||||
|
if plain_message_part:
|
||||||
|
# Replace mime_content with the plain text part content
|
||||||
|
mime_content = mime_content_to_string(plain_message_part)
|
||||||
|
message_part = plain_message_part
|
||||||
|
content_type = "text/plain"
|
||||||
|
else:
|
||||||
|
# Try to constitute the HTML response as plain text
|
||||||
|
mime_content, _x = attempt_body_extract_from_html(
|
||||||
|
mime_content if formatted_body is None else formatted_body)
|
||||||
|
else:
|
||||||
|
# Is either text/plain or some random content-type so just decode the part content and store as is
|
||||||
|
mime_content = mime_content_to_string(message_part)
|
||||||
|
# We should now have the mime content
|
||||||
|
filtered_body = parse_email_content(mime_content, include_chained_msgs)
|
||||||
|
if not filtered_body or "" == filtered_body.strip():
|
||||||
|
# A unit test that has a different HTML content to plain text which seems an invalid case as email
|
||||||
|
# tools should retain the HTML to be consistent with the plain text but manage this as a special case
|
||||||
|
# Try to constitute the HTML response as plain text
|
||||||
|
if formatted_body:
|
||||||
|
filtered_body, _x = attempt_body_extract_from_html(formatted_body)
|
||||||
|
else:
|
||||||
|
filtered_body = mime_content
|
||||||
|
# Only need the full message if the message_body excludes the chained messages
|
||||||
|
return filtered_body, mime_content
|
||||||
|
|
||||||
|
|
||||||
|
def process_as_attachment(
|
||||||
|
part: MIMEPart,
|
||||||
counter: int,
|
counter: int,
|
||||||
ticket_id: int,
|
|
||||||
files: List,
|
files: List,
|
||||||
logger: logging.Logger
|
logger: logging.Logger
|
||||||
) -> Tuple[str, str]:
|
):
|
||||||
name = part.get_filename()
|
name = part.get_filename()
|
||||||
if name:
|
if name:
|
||||||
name = email.utils.collapse_rfc2231_value(name)
|
name = f"part-{counter}_{email.utils.collapse_rfc2231_value(name)}"
|
||||||
part_body = None
|
|
||||||
part_full_body = None
|
|
||||||
if part.get_content_maintype() == 'text' and name is None:
|
|
||||||
if part.get_content_subtype() == 'plain':
|
|
||||||
part_body = part.get_payload(decode=True)
|
|
||||||
# https://github.com/django-helpdesk/django-helpdesk/issues/732
|
|
||||||
if part['Content-Transfer-Encoding'] == '8bit' and part.get_content_charset() == 'utf-8':
|
|
||||||
part_body = part_body.decode('unicode_escape')
|
|
||||||
part_body = decodeUnknown(part.get_content_charset(), part_body)
|
|
||||||
# have to use django_settings here so overwriting it works in tests
|
|
||||||
# the default value is False anyway
|
|
||||||
if ticket_id is None and getattr(django_settings, 'HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL', False):
|
|
||||||
# first message in thread, we save full body to avoid
|
|
||||||
# losing forwards and things like that
|
|
||||||
part_full_body = get_body_from_fragments(part_body)
|
|
||||||
part_body = EmailReplyParser.parse_reply(part_body)
|
|
||||||
else:
|
|
||||||
# second and other reply, save only first part of the
|
|
||||||
# message
|
|
||||||
part_body = EmailReplyParser.parse_reply(part_body)
|
|
||||||
part_full_body = part_body
|
|
||||||
# workaround to get unicode text out rather than escaped text
|
|
||||||
part_body = get_encoded_body(part_body)
|
|
||||||
logger.debug("Discovered plain text MIME part")
|
|
||||||
else:
|
|
||||||
email_body = get_email_body_from_part_payload(part)
|
|
||||||
|
|
||||||
if not part_body and not part_full_body:
|
|
||||||
# no text has been parsed so far - try such deep parsing
|
|
||||||
# for some messages
|
|
||||||
altered_body = email_body.replace(
|
|
||||||
"</p>", "</p>\n").replace("<br", "\n<br")
|
|
||||||
mail = BeautifulSoup(str(altered_body), "html.parser")
|
|
||||||
part_full_body = mail.get_text()
|
|
||||||
|
|
||||||
if "<body" not in email_body:
|
|
||||||
email_body = f"<body>{email_body}</body>"
|
|
||||||
|
|
||||||
payload = (
|
|
||||||
'<html>'
|
|
||||||
'<head>'
|
|
||||||
'<meta charset="utf-8" />'
|
|
||||||
'</head>'
|
|
||||||
'%s'
|
|
||||||
'</html>'
|
|
||||||
) % email_body
|
|
||||||
files.append(
|
|
||||||
SimpleUploadedFile(
|
|
||||||
_("email_html_body.html"), payload.encode("utf-8"), 'text/html')
|
|
||||||
)
|
|
||||||
logger.debug("Discovered HTML MIME part")
|
|
||||||
else:
|
else:
|
||||||
if not name:
|
ext = mimetypes.guess_extension(part.get_content_type())
|
||||||
ext = mimetypes.guess_extension(part.get_content_type())
|
name = f"part-{counter}{ext}"
|
||||||
name = f"part-{counter}{ext}"
|
# Extract payload accounting for attached multiparts
|
||||||
else:
|
payload_bytes = part.as_bytes() if part.is_multipart() else part.get_payload(decode=True)
|
||||||
name = f"part-{counter}_{name}"
|
files.append(SimpleUploadedFile(name, payload_bytes, mimetypes.guess_type(name)[0]))
|
||||||
payload = part.as_string() if part.is_multipart() else part.get_payload(decode=True)
|
if logger.isEnabledFor(logging.DEBUG):
|
||||||
files.append(SimpleUploadedFile(name, payload, mimetypes.guess_type(name)[0]))
|
logger.debug("Processed MIME as attachment: %s", name)
|
||||||
logger.debug("Found MIME attachment %s", name)
|
return
|
||||||
return part_body, part_full_body
|
|
||||||
|
|
||||||
|
|
||||||
def object_from_message(message: str,
|
def extract_email_subject(email_msg: EmailMessage,) -> str:
|
||||||
queue: Queue,
|
subject = email_msg.get('subject', _('Comment from e-mail'))
|
||||||
logger: logging.Logger
|
|
||||||
) -> Ticket:
|
|
||||||
# 'message' must be an RFC822 formatted message to correctly parse.
|
|
||||||
message_obj = email.message_from_string(message)
|
|
||||||
|
|
||||||
subject = message_obj.get('subject', _('Comment from e-mail'))
|
|
||||||
subject = decode_mail_headers(
|
subject = decode_mail_headers(
|
||||||
decodeUnknown(message_obj.get_charset(), subject))
|
decodeUnknown(email_msg.get_charset(), subject))
|
||||||
for affix in STRIPPED_SUBJECT_STRINGS:
|
for affix in STRIPPED_SUBJECT_STRINGS:
|
||||||
subject = subject.replace(affix, "")
|
subject = subject.replace(affix, "")
|
||||||
subject = subject.strip()
|
return subject.strip()
|
||||||
|
|
||||||
|
|
||||||
|
def extract_attachments(
|
||||||
|
target_part: MIMEPart,
|
||||||
|
files: List,
|
||||||
|
logger: logging.Logger,
|
||||||
|
counter: int = 1,
|
||||||
|
content_parts_excluded: bool = False,
|
||||||
|
) -> (int, bool):
|
||||||
|
'''
|
||||||
|
If the MIME part is a multipart and not identified as "inline" or "attachment" then
|
||||||
|
iterate over the sub parts recursively.
|
||||||
|
Otherwise extract MIME part content and add as an attachment.
|
||||||
|
It will recursively descend as appropriate ensuring that all parts not part of the message content
|
||||||
|
are added to the list of files to be attached. To cater for the possibility of text/plain and text/html parts
|
||||||
|
that are further down in the multipart hierarchy than the ones that ones meant to provide that content,
|
||||||
|
iterators are selectively used.
|
||||||
|
:param part: the email MIME part to be processed
|
||||||
|
:param files: any MIME part content or MIME parts to be attached are added to this list
|
||||||
|
:param logger: the logger to use for this MIME part processing
|
||||||
|
:param counter: the count of MIME parts added as attachment
|
||||||
|
:param content_parts_excluded: the MIME part(s) that provided the message content have been excluded
|
||||||
|
:returns the count of mime parts added as attachments and a boolean if the content parts have been excluded
|
||||||
|
'''
|
||||||
|
content_type = target_part.get_content_type()
|
||||||
|
content_maintype = target_part.get_content_maintype()
|
||||||
|
if "multipart" == content_maintype and target_part.get_content_disposition() not in ['inline', 'attachment']:
|
||||||
|
# Cycle through all MIME parts in the email extracting the attachments that were not part of the message body
|
||||||
|
# If this is a "related" multipart then we can use the message part excluder iterator directly
|
||||||
|
if "multipart/related" == content_type:
|
||||||
|
if content_parts_excluded:
|
||||||
|
# This should really never happen in a properly constructed email message but...
|
||||||
|
logger.warn(
|
||||||
|
"WARNING! Content type MIME parts have been excluded but a multipart/related has been encountered.\
|
||||||
|
There may be missing information in attachments.")
|
||||||
|
else:
|
||||||
|
content_parts_excluded = True
|
||||||
|
# Use the iterator that automatically excludes message content parts
|
||||||
|
for part in target_part.iter_attachments():
|
||||||
|
counter, content_parts_excluded = extract_attachments(
|
||||||
|
part, files, logger, counter, content_parts_excluded)
|
||||||
|
# The iterator must be different depending on whether we have already excluded message content parts
|
||||||
|
else:
|
||||||
|
# Content part might be 1 or 2 parts but will be at same level so track finding at least 1
|
||||||
|
content_part_detected = False
|
||||||
|
for part in target_part.iter_parts():
|
||||||
|
if not content_parts_excluded and part.get_content_type() in ["text/plain", "text/html"]:
|
||||||
|
content_part_detected = True
|
||||||
|
continue
|
||||||
|
# Recurse into the part to process embedded parts
|
||||||
|
counter, content_parts_excluded = extract_attachments(
|
||||||
|
part, files, logger, counter, content_parts_excluded)
|
||||||
|
# If we have found 1 or more content parts then flag that the content parts have been ommitted
|
||||||
|
# to ensure that other text/* parts are attached
|
||||||
|
if content_part_detected:
|
||||||
|
content_parts_excluded = True
|
||||||
|
else:
|
||||||
|
process_as_attachment(target_part, counter, files, logger)
|
||||||
|
counter = counter + 1
|
||||||
|
return (counter, content_parts_excluded)
|
||||||
|
|
||||||
|
|
||||||
|
def extract_email_metadata(message: str,
|
||||||
|
queue: Queue,
|
||||||
|
logger: logging.Logger
|
||||||
|
) -> Ticket:
|
||||||
|
'''
|
||||||
|
Extracts the text/plain mime part if there is one as the ticket description and
|
||||||
|
stores the text/html part as an attachment if it is present.
|
||||||
|
If no text/plain part is present then it will try to use the text/html part if
|
||||||
|
it is present as the ticket description by removing the HTML formatting.
|
||||||
|
If neither a text/plain or text/html is present then it will use the first text/*
|
||||||
|
MIME part that it finds as the ticket description.
|
||||||
|
By default it will always take only the actual message and drop any chained messages
|
||||||
|
from replies.
|
||||||
|
The HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL settings can force the entire message to be
|
||||||
|
stored in the ticket if it is a new ticket by setting it to True.
|
||||||
|
In this scenario, if it is a reply that is a forwarded message with no actual message,
|
||||||
|
then the description will be sourced from the text/html part and the forwarded message
|
||||||
|
will be in the FollowUp record associated with the ticket.
|
||||||
|
It will iterate over every MIME part and store all MIME parts as attachments apart
|
||||||
|
from the text/plain part.
|
||||||
|
There may be a case for trying to exclude repeated signature images by checking if an
|
||||||
|
attachment of the same name already exists as an attachment on the ticket but that is
|
||||||
|
not implemented.
|
||||||
|
:param message: the raw email message received
|
||||||
|
:param queue: the queue that the message is assigned to
|
||||||
|
:param logger: the logger to be used
|
||||||
|
'''
|
||||||
|
# 'message' must be an RFC822 formatted message to correctly parse.
|
||||||
|
# NBot sure why but policy explicitly set to default is required for any messages with attachments in them
|
||||||
|
message_obj: EmailMessage = email.message_from_string(message, EmailMessage, policy=policy.default)
|
||||||
|
|
||||||
|
subject = extract_email_subject(message_obj)
|
||||||
|
|
||||||
# TODO: Should really be assigning a properly formatted fake email.
|
|
||||||
# Check if anything relies on this being a "real name" formatted string if no sender is found on message_obj.
|
|
||||||
# Also not sure it should be accepting emails from unknown senders
|
|
||||||
sender_email = _('Unknown Sender')
|
sender_email = _('Unknown Sender')
|
||||||
sender_hdr = message_obj.get('from')
|
sender_hdr = message_obj.get('from')
|
||||||
if sender_hdr:
|
if sender_hdr:
|
||||||
@ -831,26 +952,24 @@ def object_from_message(message: str,
|
|||||||
subject,
|
subject,
|
||||||
logger
|
logger
|
||||||
)
|
)
|
||||||
|
|
||||||
body = None
|
|
||||||
full_body = None
|
|
||||||
counter = 0
|
|
||||||
files = []
|
files = []
|
||||||
|
# first message in thread, we save full body to avoid losing forwards and things like that
|
||||||
for part in message_obj.walk():
|
include_chained_msgs = True if ticket_id is None and getattr(
|
||||||
if part.get_content_maintype() == 'multipart':
|
django_settings, 'HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL', False) else False
|
||||||
continue
|
filtered_body, full_body = extract_email_message_content(message_obj, files, include_chained_msgs)
|
||||||
# See email.message_obj.Message.get_filename()
|
# If the base part is not a multipart then it will have already been processed as the vbody content so
|
||||||
part_body, part_full_body = extract_part_data(part, counter, ticket_id, files, logger)
|
# no need to process attachments
|
||||||
if part_body:
|
if "multipart" == message_obj.get_content_maintype():
|
||||||
body = part_body
|
# Find and attach all other parts or part contents as attachments
|
||||||
full_body = part_full_body
|
counter, content_parts_excluded = extract_attachments(message_obj, files, logger)
|
||||||
counter += 1
|
if not content_parts_excluded:
|
||||||
|
# Unexpected situation and may mean there is a hole in the email processing logic
|
||||||
if not body:
|
logger.warning(
|
||||||
body, full_body = attempt_body_extract_from_html(message_obj)
|
"Failed to exclude email content when parsing all MIME parts in the multipart.\
|
||||||
|
Verify that there were no text/* parts containing message content.")
|
||||||
add_file_if_always_save_incoming_email_message(files, message_obj)
|
if logger.isEnabledFor(logging.DEBUG):
|
||||||
|
logger.debug("Email parsed and %s attachments were found and attached.", counter)
|
||||||
|
add_file_if_always_save_incoming_email_message(files, message)
|
||||||
|
|
||||||
smtp_priority = message_obj.get('priority', '')
|
smtp_priority = message_obj.get('priority', '')
|
||||||
smtp_importance = message_obj.get('importance', '')
|
smtp_importance = message_obj.get('importance', '')
|
||||||
@ -859,8 +978,8 @@ def object_from_message(message: str,
|
|||||||
smtp_priority, smtp_importance} else 3
|
smtp_priority, smtp_importance} else 3
|
||||||
|
|
||||||
payload = {
|
payload = {
|
||||||
'body': body,
|
'body': filtered_body,
|
||||||
'full_body': full_body or body,
|
'full_body': full_body,
|
||||||
'subject': subject,
|
'subject': subject,
|
||||||
'queue': queue,
|
'queue': queue,
|
||||||
'sender_email': sender_email,
|
'sender_email': sender_email,
|
||||||
|
@ -186,11 +186,11 @@ def format_time_spent(time_spent):
|
|||||||
|
|
||||||
def convert_value(value):
|
def convert_value(value):
|
||||||
""" Convert date/time data type to known fixed format string """
|
""" Convert date/time data type to known fixed format string """
|
||||||
if type(value) == datetime:
|
if type(value) is datetime:
|
||||||
return value.strftime(CUSTOMFIELD_DATETIME_FORMAT)
|
return value.strftime(CUSTOMFIELD_DATETIME_FORMAT)
|
||||||
elif type(value) == date:
|
elif type(value) is date:
|
||||||
return value.strftime(CUSTOMFIELD_DATE_FORMAT)
|
return value.strftime(CUSTOMFIELD_DATE_FORMAT)
|
||||||
elif type(value) == time:
|
elif type(value) is time:
|
||||||
return value.strftime(CUSTOMFIELD_TIME_FORMAT)
|
return value.strftime(CUSTOMFIELD_TIME_FORMAT)
|
||||||
else:
|
else:
|
||||||
return value
|
return value
|
||||||
|
@ -229,7 +229,7 @@ HELPDESK_ENABLE_PER_QUEUE_STAFF_PERMISSION = getattr(
|
|||||||
|
|
||||||
# use https in the email links
|
# use https in the email links
|
||||||
HELPDESK_USE_HTTPS_IN_EMAIL_LINK = getattr(
|
HELPDESK_USE_HTTPS_IN_EMAIL_LINK = getattr(
|
||||||
settings, 'HELPDESK_USE_HTTPS_IN_EMAIL_LINK', False)
|
settings, 'HELPDESK_USE_HTTPS_IN_EMAIL_LINK', settings.SECURE_SSL_REDIRECT)
|
||||||
|
|
||||||
HELPDESK_TEAMS_MODEL = getattr(
|
HELPDESK_TEAMS_MODEL = getattr(
|
||||||
settings, 'HELPDESK_TEAMS_MODEL', 'pinax_teams.Team')
|
settings, 'HELPDESK_TEAMS_MODEL', 'pinax_teams.Team')
|
||||||
|
@ -97,7 +97,7 @@ def send_templated_mail(template_name,
|
|||||||
if isinstance(recipients, str):
|
if isinstance(recipients, str):
|
||||||
if recipients.find(','):
|
if recipients.find(','):
|
||||||
recipients = recipients.split(',')
|
recipients = recipients.split(',')
|
||||||
elif type(recipients) != list:
|
elif type(recipients) is not list:
|
||||||
recipients = [recipients]
|
recipients = [recipients]
|
||||||
|
|
||||||
msg = EmailMultiAlternatives(subject_part, text_part,
|
msg = EmailMultiAlternatives(subject_part, text_part,
|
||||||
|
@ -7,11 +7,14 @@ from django.core.files.uploadedfile import SimpleUploadedFile
|
|||||||
from django.core.management import call_command
|
from django.core.management import call_command
|
||||||
from django.shortcuts import get_object_or_404
|
from django.shortcuts import get_object_or_404
|
||||||
from django.test import override_settings, TestCase
|
from django.test import override_settings, TestCase
|
||||||
|
from email.mime.message import MIMEMessage
|
||||||
|
from email.mime.multipart import MIMEMultipart
|
||||||
|
from email.mime.text import MIMEText
|
||||||
import helpdesk.email
|
import helpdesk.email
|
||||||
from helpdesk.email import extract_part_data, object_from_message
|
from helpdesk.email import extract_email_metadata, process_as_attachment
|
||||||
from helpdesk.exceptions import DeleteIgnoredTicketException, IgnoreTicketException
|
from helpdesk.exceptions import DeleteIgnoredTicketException, IgnoreTicketException
|
||||||
from helpdesk.management.commands.get_email import Command
|
from helpdesk.management.commands.get_email import Command
|
||||||
from helpdesk.models import Attachment, FollowUp, FollowUpAttachment, IgnoreEmail, Queue, Ticket, TicketCC
|
from helpdesk.models import FollowUp, FollowUpAttachment, IgnoreEmail, Queue, Ticket, TicketCC
|
||||||
from helpdesk.tests import utils
|
from helpdesk.tests import utils
|
||||||
import itertools
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
@ -59,7 +62,7 @@ class GetEmailCommonTests(TestCase):
|
|||||||
"""
|
"""
|
||||||
with open(os.path.join(THIS_DIR, "test_files/blank-body-with-attachment.eml"), encoding="utf-8") as fd:
|
with open(os.path.join(THIS_DIR, "test_files/blank-body-with-attachment.eml"), encoding="utf-8") as fd:
|
||||||
test_email = fd.read()
|
test_email = fd.read()
|
||||||
ticket = helpdesk.email.object_from_message(
|
ticket = helpdesk.email.extract_email_metadata(
|
||||||
test_email, self.queue_public, self.logger)
|
test_email, self.queue_public, self.logger)
|
||||||
# title got truncated because of max_lengh of the model.title field
|
# title got truncated because of max_lengh of the model.title field
|
||||||
assert ticket.title == (
|
assert ticket.title == (
|
||||||
@ -67,7 +70,7 @@ class GetEmailCommonTests(TestCase):
|
|||||||
"ooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
|
"ooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
|
||||||
"ooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo..."
|
"ooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo..."
|
||||||
)
|
)
|
||||||
self.assertEqual(ticket.description, "")
|
self.assertEqual(ticket.description.strip(), "", msg=ticket.description)
|
||||||
|
|
||||||
def test_email_with_quoted_printable_body(self):
|
def test_email_with_quoted_printable_body(self):
|
||||||
"""
|
"""
|
||||||
@ -75,7 +78,7 @@ class GetEmailCommonTests(TestCase):
|
|||||||
"""
|
"""
|
||||||
with open(os.path.join(THIS_DIR, "test_files/quoted_printable.eml"), encoding="utf-8") as fd:
|
with open(os.path.join(THIS_DIR, "test_files/quoted_printable.eml"), encoding="utf-8") as fd:
|
||||||
test_email = fd.read()
|
test_email = fd.read()
|
||||||
ticket = helpdesk.email.object_from_message(
|
ticket = helpdesk.email.extract_email_metadata(
|
||||||
test_email, self.queue_public, self.logger)
|
test_email, self.queue_public, self.logger)
|
||||||
self.assertEqual(ticket.title, "Český test")
|
self.assertEqual(ticket.title, "Český test")
|
||||||
self.assertEqual(ticket.description,
|
self.assertEqual(ticket.description,
|
||||||
@ -96,24 +99,29 @@ class GetEmailCommonTests(TestCase):
|
|||||||
"""
|
"""
|
||||||
with open(os.path.join(THIS_DIR, "test_files/all-special-chars.eml"), encoding="utf-8") as fd:
|
with open(os.path.join(THIS_DIR, "test_files/all-special-chars.eml"), encoding="utf-8") as fd:
|
||||||
test_email = fd.read()
|
test_email = fd.read()
|
||||||
ticket = helpdesk.email.object_from_message(
|
ticket = helpdesk.email.extract_email_metadata(
|
||||||
test_email, self.queue_public, self.logger)
|
test_email, self.queue_public, self.logger)
|
||||||
self.assertEqual(ticket.title, "Testovácí email")
|
self.assertEqual(ticket.title, "Testovácí email")
|
||||||
self.assertEqual(ticket.description, "íářčšáíéřášč")
|
self.assertEqual(ticket.description, "íářčšáíéřášč")
|
||||||
|
|
||||||
@override_settings(HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL=True)
|
@override_settings(HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL=False)
|
||||||
def test_email_with_utf_8_non_decodable_sequences(self):
|
def test_email_with_utf_8_non_decodable_sequences(self):
|
||||||
"""
|
"""
|
||||||
Tests that emails with utf-8 non-decodable sequences are parsed correctly
|
Tests that emails with utf-8 non-decodable sequences are parsed correctly
|
||||||
The message is fowarded as well
|
The forwarded part of the message must still be in the ticket description if
|
||||||
|
HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL is set to True.
|
||||||
|
Otherwise it should only be in the Followup records Comment field
|
||||||
|
NOTE: This test weirdly tests having completely different content in the HTML part
|
||||||
|
than the PLAIN part so not sure about the validity of this as a test but
|
||||||
|
the code does support this oddity anyway for backwards compatibility
|
||||||
"""
|
"""
|
||||||
with open(os.path.join(THIS_DIR, "test_files/utf-nondecodable.eml"), encoding="utf-8") as fd:
|
with open(os.path.join(THIS_DIR, "test_files/utf-nondecodable.eml"), encoding="utf-8") as fd:
|
||||||
test_email = fd.read()
|
test_email = fd.read()
|
||||||
ticket = helpdesk.email.object_from_message(
|
ticket = helpdesk.email.extract_email_metadata(
|
||||||
test_email, self.queue_public, self.logger)
|
test_email, self.queue_public, self.logger)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
ticket.title, "Fwd: Cyklozaměstnavatel - změna vyhodnocení")
|
ticket.title, "Fwd: Cyklozaměstnavatel - změna vyhodnocení")
|
||||||
self.assertIn("prosazuje lepší", ticket.description)
|
self.assertIn("prosazuje lepší", ticket.description, "Missing text \"prosazuje lepší\" in description: %s" % ticket.description)
|
||||||
followups = FollowUp.objects.filter(ticket=ticket)
|
followups = FollowUp.objects.filter(ticket=ticket)
|
||||||
followup = followups[0]
|
followup = followups[0]
|
||||||
attachments = FollowUpAttachment.objects.filter(followup=followup)
|
attachments = FollowUpAttachment.objects.filter(followup=followup)
|
||||||
@ -122,21 +130,40 @@ class GetEmailCommonTests(TestCase):
|
|||||||
attachment.file.read().decode("utf-8"))
|
attachment.file.read().decode("utf-8"))
|
||||||
|
|
||||||
@override_settings(HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL=True)
|
@override_settings(HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL=True)
|
||||||
def test_email_with_forwarded_message(self):
|
def test_email_with_forwarded_message_just_message_stored(self):
|
||||||
"""
|
"""
|
||||||
Forwarded message of that format must be still attached correctly
|
The forwarded part of the message must still be in the ticket description if
|
||||||
|
HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL is set to True.
|
||||||
|
Otherwise it should only be in the Followup records Comment field
|
||||||
"""
|
"""
|
||||||
with open(os.path.join(THIS_DIR, "test_files/forwarded-message.eml"), encoding="utf-8") as fd:
|
with open(os.path.join(THIS_DIR, "test_files/forwarded-message.eml"), encoding="utf-8") as fd:
|
||||||
test_email = fd.read()
|
test_email = fd.read()
|
||||||
ticket = helpdesk.email.object_from_message(
|
ticket = helpdesk.email.extract_email_metadata(
|
||||||
test_email, self.queue_public, self.logger)
|
test_email, self.queue_public, self.logger)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
ticket.title, "Test with original message from GitHub")
|
ticket.title, "Test with original message from GitHub")
|
||||||
self.assertIn("This is email body", ticket.description)
|
self.assertIn("This is email body", ticket.description)
|
||||||
assert "Hello there!" not in ticket.description, ticket.description
|
self.assertTrue("Hello there!" in ticket.description, ticket.description)
|
||||||
assert FollowUp.objects.filter(ticket=ticket).count() == 1
|
self.assertTrue(FollowUp.objects.filter(ticket=ticket).count() == 1)
|
||||||
assert "Hello there!" in FollowUp.objects.filter(
|
|
||||||
ticket=ticket).first().comment
|
@override_settings(HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL=False)
|
||||||
|
def test_email_with_forwarded_message_chained_messages_stored(self):
|
||||||
|
"""
|
||||||
|
The forwarded part of the message must still be in the ticket description if
|
||||||
|
HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL is set to True.
|
||||||
|
Otherwise it should only be in the Followup records Comment field
|
||||||
|
"""
|
||||||
|
with open(os.path.join(THIS_DIR, "test_files/forwarded-message.eml"), encoding="utf-8") as fd:
|
||||||
|
test_email = fd.read()
|
||||||
|
ticket = helpdesk.email.extract_email_metadata(
|
||||||
|
test_email, self.queue_public, self.logger)
|
||||||
|
self.assertEqual(
|
||||||
|
ticket.title, "Test with original message from GitHub")
|
||||||
|
self.assertIn("This is email body", ticket.description)
|
||||||
|
self.assertTrue("Hello there!" not in ticket.description, ticket.description)
|
||||||
|
followups = FollowUp.objects.filter(ticket=ticket).values("comment")
|
||||||
|
self.assertTrue(followups.count() == 1)
|
||||||
|
self.assertTrue("Hello there!" in followups[0]["comment"], followups[0]["comment"])
|
||||||
|
|
||||||
def test_will_delete_ignored_email(self):
|
def test_will_delete_ignored_email(self):
|
||||||
"""
|
"""
|
||||||
@ -147,7 +174,7 @@ class GetEmailCommonTests(TestCase):
|
|||||||
ignore = IgnoreEmail(name="Test Ignore", email_address=from_meta[1], keep_in_mailbox=False)
|
ignore = IgnoreEmail(name="Test Ignore", email_address=from_meta[1], keep_in_mailbox=False)
|
||||||
ignore.save()
|
ignore.save()
|
||||||
with self.assertRaises(DeleteIgnoredTicketException):
|
with self.assertRaises(DeleteIgnoredTicketException):
|
||||||
object_from_message(message.as_string(), self.queue_public, self.logger)
|
extract_email_metadata(message.as_string(), self.queue_public, self.logger)
|
||||||
|
|
||||||
def test_will_not_delete_ignored_email(self):
|
def test_will_not_delete_ignored_email(self):
|
||||||
"""
|
"""
|
||||||
@ -158,7 +185,7 @@ class GetEmailCommonTests(TestCase):
|
|||||||
ignore = IgnoreEmail(name="Test Ignore", email_address=from_meta[1], keep_in_mailbox=True)
|
ignore = IgnoreEmail(name="Test Ignore", email_address=from_meta[1], keep_in_mailbox=True)
|
||||||
ignore.save()
|
ignore.save()
|
||||||
with self.assertRaises(IgnoreTicketException):
|
with self.assertRaises(IgnoreTicketException):
|
||||||
object_from_message(message.as_string(), self.queue_public, self.logger)
|
extract_email_metadata(message.as_string(), self.queue_public, self.logger)
|
||||||
|
|
||||||
def test_utf8_filename_attachment(self):
|
def test_utf8_filename_attachment(self):
|
||||||
"""
|
"""
|
||||||
@ -167,7 +194,7 @@ class GetEmailCommonTests(TestCase):
|
|||||||
filename = "TeléfonoMañana.txt"
|
filename = "TeléfonoMañana.txt"
|
||||||
part = utils.generate_file_mime_part(locale="es_ES", filename=filename)
|
part = utils.generate_file_mime_part(locale="es_ES", filename=filename)
|
||||||
files: typing.List[SimpleUploadedFile] = []
|
files: typing.List[SimpleUploadedFile] = []
|
||||||
extract_part_data(part, counter=1, ticket_id="tst1", files=files, logger=self.logger)
|
process_as_attachment(part, counter=1, files=files, logger=self.logger)
|
||||||
sent_file: SimpleUploadedFile = files[0]
|
sent_file: SimpleUploadedFile = files[0]
|
||||||
# The extractor prepends a part identifier so compare the ending
|
# The extractor prepends a part identifier so compare the ending
|
||||||
self.assertTrue(sent_file.name.endswith(filename), f"Filename extracted does not match: {sent_file.name}")
|
self.assertTrue(sent_file.name.endswith(filename), f"Filename extracted does not match: {sent_file.name}")
|
||||||
@ -179,17 +206,17 @@ class GetEmailCommonTests(TestCase):
|
|||||||
"""
|
"""
|
||||||
message, _, _ = utils.generate_multipart_email(type_list=['plain', 'image'])
|
message, _, _ = utils.generate_multipart_email(type_list=['plain', 'image'])
|
||||||
|
|
||||||
self.assertEqual(len(mail.outbox), 0)
|
self.assertEqual(len(mail.outbox), 0) # @UndefinedVariable
|
||||||
|
|
||||||
with self.assertLogs(logger='helpdesk', level='ERROR') as cm:
|
with self.assertLogs(logger='helpdesk', level='ERROR') as cm:
|
||||||
object_from_message(message.as_string(), self.queue_public, self.logger)
|
extract_email_metadata(message.as_string(), self.queue_public, self.logger)
|
||||||
|
|
||||||
self.assertIn(
|
self.assertIn(
|
||||||
"ERROR:helpdesk:['Unsupported file extension: .jpg']",
|
"ERROR:helpdesk:['Unsupported file extension: .jpg']",
|
||||||
cm.output
|
cm.output
|
||||||
)
|
)
|
||||||
|
|
||||||
self.assertEqual(len(mail.outbox), 1)
|
self.assertEqual(len(mail.outbox), 1) # @UndefinedVariable
|
||||||
self.assertEqual(f'[test-1] {message.get("subject")} (Opened)', mail.outbox[0].subject)
|
self.assertEqual(f'[test-1] {message.get("subject")} (Opened)', mail.outbox[0].subject)
|
||||||
|
|
||||||
def test_multiple_attachments(self):
|
def test_multiple_attachments(self):
|
||||||
@ -198,11 +225,11 @@ class GetEmailCommonTests(TestCase):
|
|||||||
"""
|
"""
|
||||||
message, _, _ = utils.generate_multipart_email(type_list=['plain', 'file', 'image'])
|
message, _, _ = utils.generate_multipart_email(type_list=['plain', 'file', 'image'])
|
||||||
|
|
||||||
self.assertEqual(len(mail.outbox), 0)
|
self.assertEqual(len(mail.outbox), 0) # @UndefinedVariable
|
||||||
|
|
||||||
object_from_message(message.as_string(), self.queue_public, self.logger)
|
extract_email_metadata(message.as_string(), self.queue_public, self.logger)
|
||||||
|
|
||||||
self.assertEqual(len(mail.outbox), 1)
|
self.assertEqual(len(mail.outbox), 1) # @UndefinedVariable
|
||||||
self.assertEqual(f'[test-1] {message.get("subject")} (Opened)', mail.outbox[0].subject)
|
self.assertEqual(f'[test-1] {message.get("subject")} (Opened)', mail.outbox[0].subject)
|
||||||
|
|
||||||
ticket = Ticket.objects.get()
|
ticket = Ticket.objects.get()
|
||||||
@ -216,10 +243,10 @@ class GetEmailCommonTests(TestCase):
|
|||||||
"""
|
"""
|
||||||
message, _, _ = utils.generate_multipart_email(type_list=['plain', 'image', 'file', 'image'])
|
message, _, _ = utils.generate_multipart_email(type_list=['plain', 'image', 'file', 'image'])
|
||||||
|
|
||||||
self.assertEqual(len(mail.outbox), 0)
|
self.assertEqual(len(mail.outbox), 0) # @UndefinedVariable
|
||||||
|
|
||||||
with self.assertLogs(logger='helpdesk', level='ERROR') as cm:
|
with self.assertLogs(logger='helpdesk', level='ERROR') as cm:
|
||||||
object_from_message(message.as_string(), self.queue_public, self.logger)
|
extract_email_metadata(message.as_string(), self.queue_public, self.logger)
|
||||||
|
|
||||||
self.assertIn(
|
self.assertIn(
|
||||||
"ERROR:helpdesk:['Unsupported file extension: .jpg']",
|
"ERROR:helpdesk:['Unsupported file extension: .jpg']",
|
||||||
@ -240,18 +267,78 @@ class GetEmailCommonTests(TestCase):
|
|||||||
att_content = email_attachment.as_string()
|
att_content = email_attachment.as_string()
|
||||||
message.attach(utils.generate_file_mime_part(filename=att_filename, content=att_content))
|
message.attach(utils.generate_file_mime_part(filename=att_filename, content=att_content))
|
||||||
|
|
||||||
object_from_message(message.as_string(), self.queue_public, self.logger)
|
extract_email_metadata(message.as_string(), self.queue_public, self.logger)
|
||||||
|
self.assertEqual(len(mail.outbox), 1) # @UndefinedVariable
|
||||||
self.assertEqual(len(mail.outbox), 1)
|
|
||||||
self.assertEqual(f'[test-1] {message.get("subject")} (Opened)', mail.outbox[0].subject)
|
self.assertEqual(f'[test-1] {message.get("subject")} (Opened)', mail.outbox[0].subject)
|
||||||
|
|
||||||
ticket = Ticket.objects.get()
|
ticket = Ticket.objects.get()
|
||||||
followup = ticket.followup_set.get()
|
followup = ticket.followup_set.get()
|
||||||
att_retrieved: Attachment = followup.followupattachment_set.get()
|
|
||||||
self.assertTrue(att_retrieved.filename.endswith(att_filename), "Filename of attached multipart not detected: %s" % (att_retrieved.filename))
|
for att_retrieved in followup.followupattachment_set.all():
|
||||||
with att_retrieved.file.open('r') as f:
|
if (helpdesk.email.HTML_EMAIL_ATTACHMENT_FILENAME == att_retrieved.filename):
|
||||||
retrieved_content = f.read()
|
# Ignore the HTML formatted conntent of the email that is attached
|
||||||
self.assertEquals(att_content, retrieved_content, "Retrieved attachment content different to original :\n\n%s\n\n%s" % (att_content, retrieved_content))
|
continue
|
||||||
|
self.assertTrue(att_retrieved.filename.endswith(att_filename), "Filename of attached multipart not detected: %s" % (att_retrieved.filename))
|
||||||
|
with att_retrieved.file.open('r') as f:
|
||||||
|
retrieved_content = f.read()
|
||||||
|
self.assertEquals(att_content, retrieved_content, "Retrieved attachment content different to original :\n\n%s\n\n%s" % (att_content, retrieved_content))
|
||||||
|
|
||||||
|
def test_email_with_inline_and_multipart_as_attachments(self):
|
||||||
|
"""
|
||||||
|
Test a multipart email with an inline attachment and a multipart email attachment to the email
|
||||||
|
"""
|
||||||
|
inline_att_filename = 'inline_attachment.jpg'
|
||||||
|
email_att_filename = 'email_attachment2.eml'
|
||||||
|
# Create the inline attachment for the email using an ID so we can reference it in the email body
|
||||||
|
inline_image_id = "liTE5er6Dbnd"
|
||||||
|
inline_attachment = utils.generate_image_mime_part(locale="en_US", imagename=inline_att_filename, disposition_primary_type="inline")
|
||||||
|
inline_attachment.add_header('X-Attachment-Id', inline_image_id)
|
||||||
|
inline_attachment.add_header('Content-ID', '<' + inline_image_id + '>')
|
||||||
|
# Create the actual email with its plain and HTML parts
|
||||||
|
alt_email_message = MIMEMultipart("alternative")
|
||||||
|
# Create the plain and HTML that will reference the inline attachment
|
||||||
|
plain_body = "Test with inline image: \n[image: " + inline_att_filename + "]\n\n"
|
||||||
|
plain_msg = MIMEText(plain_body)
|
||||||
|
alt_email_message.attach(plain_msg)
|
||||||
|
html_body = '<div><div>Test with inline image: <img src=3D"cid:' + inline_image_id + '" alt=3D="' + inline_att_filename + '"><br></div>'
|
||||||
|
html_msg = MIMEText(html_body, "html")
|
||||||
|
alt_email_message.attach(html_msg)
|
||||||
|
# Create the email to be attached and attach that as well
|
||||||
|
email_to_be_attached, _, _ = utils.generate_multipart_email(type_list=['plain', 'html'])
|
||||||
|
email_as_attachment = MIMEMessage(email_to_be_attached)
|
||||||
|
email_as_attachment.add_header('Content-Disposition', 'attachment', filename=email_att_filename)
|
||||||
|
# Now create the base multipart and attach all the other parts to it
|
||||||
|
related_message = MIMEMultipart("related")
|
||||||
|
related_message.attach(alt_email_message)
|
||||||
|
related_message.attach(inline_attachment)
|
||||||
|
base_message = MIMEMultipart("mixed")
|
||||||
|
base_message.attach(related_message)
|
||||||
|
base_message.attach(email_as_attachment)
|
||||||
|
utils.add_simple_email_headers(base_message, locale="en_US", use_short_email=True)
|
||||||
|
# Now send the part to the email workflow
|
||||||
|
extract_email_metadata(base_message.as_string(), self.queue_public, self.logger)
|
||||||
|
|
||||||
|
self.assertEqual(len(mail.outbox), 1) # @UndefinedVariable
|
||||||
|
self.assertEqual(f'[test-1] {base_message.get("subject")} (Opened)', mail.outbox[0].subject)
|
||||||
|
|
||||||
|
ticket = Ticket.objects.get()
|
||||||
|
followup = ticket.followup_set.get()
|
||||||
|
# Check both the inline and attachment are stored as attached files
|
||||||
|
inline_found = False
|
||||||
|
email_attachment_found = False
|
||||||
|
for att_retrieved in followup.followupattachment_set.all():
|
||||||
|
if (helpdesk.email.HTML_EMAIL_ATTACHMENT_FILENAME == att_retrieved.filename):
|
||||||
|
# Ignore the HTML formatted content of the email that is attached
|
||||||
|
continue
|
||||||
|
if att_retrieved.filename.endswith(inline_att_filename):
|
||||||
|
inline_found = True
|
||||||
|
elif att_retrieved.filename.endswith(email_att_filename):
|
||||||
|
email_attachment_found = True
|
||||||
|
else:
|
||||||
|
print(f"\n\n%%%%%% {att_retrieved}")
|
||||||
|
self.assertTrue(False, "Unexpected file in ticket attachments: %s" % att_retrieved.filename)
|
||||||
|
self.assertTrue(email_attachment_found, "Email attachment file not found ticket attachments: %s" % (email_att_filename))
|
||||||
|
self.assertTrue(inline_found, "Inline file not found in email: %s" % (inline_att_filename))
|
||||||
|
|
||||||
|
|
||||||
class GetEmailParametricTemplate(object):
|
class GetEmailParametricTemplate(object):
|
||||||
@ -424,7 +511,6 @@ class GetEmailParametricTemplate(object):
|
|||||||
def test_commas_in_mail_headers(self):
|
def test_commas_in_mail_headers(self):
|
||||||
"""Tests correctly decoding mail headers when a comma is encoded into
|
"""Tests correctly decoding mail headers when a comma is encoded into
|
||||||
UTF-8. See bug report #832."""
|
UTF-8. See bug report #832."""
|
||||||
|
|
||||||
# Create the from using standard RFC required formats
|
# Create the from using standard RFC required formats
|
||||||
# Override the last_name to ensure we get a non-ascii character in it
|
# Override the last_name to ensure we get a non-ascii character in it
|
||||||
test_email_from_meta = utils.generate_email_address("fr_FR", last_name_override="Bouissières")
|
test_email_from_meta = utils.generate_email_address("fr_FR", last_name_override="Bouissières")
|
||||||
@ -488,7 +574,6 @@ class GetEmailParametricTemplate(object):
|
|||||||
mocked_imaplib_server = mock.Mock()
|
mocked_imaplib_server = mock.Mock()
|
||||||
mocked_imaplib_server.search = mock.Mock(
|
mocked_imaplib_server.search = mock.Mock(
|
||||||
return_value=imap_mail_list)
|
return_value=imap_mail_list)
|
||||||
|
|
||||||
# we ignore the second arg as the data item/mime-part is
|
# we ignore the second arg as the data item/mime-part is
|
||||||
# constant (RFC822)
|
# constant (RFC822)
|
||||||
mocked_imaplib_server.fetch = mock.Mock(
|
mocked_imaplib_server.fetch = mock.Mock(
|
||||||
@ -510,7 +595,6 @@ class GetEmailParametricTemplate(object):
|
|||||||
mocked_imaplib_server = mock.Mock()
|
mocked_imaplib_server = mock.Mock()
|
||||||
mocked_imaplib_server.search = mock.Mock(
|
mocked_imaplib_server.search = mock.Mock(
|
||||||
return_value=imap_mail_list)
|
return_value=imap_mail_list)
|
||||||
|
|
||||||
# we ignore the second arg as the data item/mime-part is
|
# we ignore the second arg as the data item/mime-part is
|
||||||
# constant (RFC822)
|
# constant (RFC822)
|
||||||
mocked_imaplib_server.fetch = mock.Mock(
|
mocked_imaplib_server.fetch = mock.Mock(
|
||||||
@ -553,7 +637,6 @@ class GetEmailParametricTemplate(object):
|
|||||||
except this time the email body contains a Django template tag.
|
except this time the email body contains a Django template tag.
|
||||||
For each email source supported, we mock the backend to provide
|
For each email source supported, we mock the backend to provide
|
||||||
authentically formatted responses containing our test data."""
|
authentically formatted responses containing our test data."""
|
||||||
|
|
||||||
# example email text from Django docs:
|
# example email text from Django docs:
|
||||||
# https://docs.djangoproject.com/en/1.10/ref/unicode/
|
# https://docs.djangoproject.com/en/1.10/ref/unicode/
|
||||||
test_email_from = "Arnbjörg Ráðormsdóttir <arnbjorg@example.com>"
|
test_email_from = "Arnbjörg Ráðormsdóttir <arnbjorg@example.com>"
|
||||||
@ -617,7 +700,6 @@ class GetEmailParametricTemplate(object):
|
|||||||
mocked_imaplib_server = mock.Mock()
|
mocked_imaplib_server = mock.Mock()
|
||||||
mocked_imaplib_server.search = mock.Mock(
|
mocked_imaplib_server.search = mock.Mock(
|
||||||
return_value=imap_mail_list)
|
return_value=imap_mail_list)
|
||||||
|
|
||||||
# we ignore the second arg as the data item/mime-part is
|
# we ignore the second arg as the data item/mime-part is
|
||||||
# constant (RFC822)
|
# constant (RFC822)
|
||||||
mocked_imaplib_server.fetch = mock.Mock(
|
mocked_imaplib_server.fetch = mock.Mock(
|
||||||
@ -639,7 +721,6 @@ class GetEmailParametricTemplate(object):
|
|||||||
mocked_imaplib_server = mock.Mock()
|
mocked_imaplib_server = mock.Mock()
|
||||||
mocked_imaplib_server.search = mock.Mock(
|
mocked_imaplib_server.search = mock.Mock(
|
||||||
return_value=imap_mail_list)
|
return_value=imap_mail_list)
|
||||||
|
|
||||||
# we ignore the second arg as the data item/mime-part is
|
# we ignore the second arg as the data item/mime-part is
|
||||||
# constant (RFC822)
|
# constant (RFC822)
|
||||||
mocked_imaplib_server.fetch = mock.Mock(
|
mocked_imaplib_server.fetch = mock.Mock(
|
||||||
@ -680,12 +761,8 @@ class GetEmailParametricTemplate(object):
|
|||||||
emails from a queue and creating tickets.
|
emails from a queue and creating tickets.
|
||||||
For each email source supported, we mock the backend to provide
|
For each email source supported, we mock the backend to provide
|
||||||
authentically formatted responses containing our test data."""
|
authentically formatted responses containing our test data."""
|
||||||
|
|
||||||
# example email text from Python docs:
|
# example email text from Python docs:
|
||||||
# https://docs.python.org/3/library/email-examples.html
|
# https://docs.python.org/3/library/email-examples.html
|
||||||
from email.mime.multipart import MIMEMultipart
|
|
||||||
from email.mime.text import MIMEText
|
|
||||||
|
|
||||||
me = "my@example.com"
|
me = "my@example.com"
|
||||||
you = "your@example.com"
|
you = "your@example.com"
|
||||||
# NOTE: CC'd emails need to be alphabetical and tested as such!
|
# NOTE: CC'd emails need to be alphabetical and tested as such!
|
||||||
@ -695,7 +772,6 @@ class GetEmailParametricTemplate(object):
|
|||||||
cc_two = "other@example.com"
|
cc_two = "other@example.com"
|
||||||
cc = cc_one + ", " + cc_two
|
cc = cc_one + ", " + cc_two
|
||||||
subject = "Link"
|
subject = "Link"
|
||||||
|
|
||||||
# Create message container - the correct MIME type is
|
# Create message container - the correct MIME type is
|
||||||
# multipart/alternative.
|
# multipart/alternative.
|
||||||
msg = MIMEMultipart('alternative')
|
msg = MIMEMultipart('alternative')
|
||||||
@ -703,7 +779,6 @@ class GetEmailParametricTemplate(object):
|
|||||||
msg['From'] = me
|
msg['From'] = me
|
||||||
msg['To'] = you
|
msg['To'] = you
|
||||||
msg['Cc'] = cc
|
msg['Cc'] = cc
|
||||||
|
|
||||||
# Create the body of the message (a plain-text and an HTML version).
|
# Create the body of the message (a plain-text and an HTML version).
|
||||||
text = "Hi!\nHow are you?\nHere is the link you wanted:\nhttps://www.python.org"
|
text = "Hi!\nHow are you?\nHere is the link you wanted:\nhttps://www.python.org"
|
||||||
html = """\
|
html = """\
|
||||||
@ -717,11 +792,9 @@ class GetEmailParametricTemplate(object):
|
|||||||
</body>
|
</body>
|
||||||
</html>
|
</html>
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Record the MIME types of both parts - text/plain and text/html.
|
# Record the MIME types of both parts - text/plain and text/html.
|
||||||
part1 = MIMEText(text, 'plain')
|
part1 = MIMEText(text, 'plain')
|
||||||
part2 = MIMEText(html, 'html')
|
part2 = MIMEText(html, 'html')
|
||||||
|
|
||||||
# Attach parts into message container.
|
# Attach parts into message container.
|
||||||
# According to RFC 2046, the last part of a multipart message, in this case
|
# According to RFC 2046, the last part of a multipart message, in this case
|
||||||
# the HTML message, is best and preferred.
|
# the HTML message, is best and preferred.
|
||||||
@ -785,7 +858,6 @@ class GetEmailParametricTemplate(object):
|
|||||||
mocked_imaplib_server = mock.Mock()
|
mocked_imaplib_server = mock.Mock()
|
||||||
mocked_imaplib_server.search = mock.Mock(
|
mocked_imaplib_server.search = mock.Mock(
|
||||||
return_value=imap_mail_list)
|
return_value=imap_mail_list)
|
||||||
|
|
||||||
# we ignore the second arg as the data item/mime-part is
|
# we ignore the second arg as the data item/mime-part is
|
||||||
# constant (RFC822)
|
# constant (RFC822)
|
||||||
mocked_imaplib_server.fetch = mock.Mock(
|
mocked_imaplib_server.fetch = mock.Mock(
|
||||||
@ -807,7 +879,6 @@ class GetEmailParametricTemplate(object):
|
|||||||
mocked_imaplib_server = mock.Mock()
|
mocked_imaplib_server = mock.Mock()
|
||||||
mocked_imaplib_server.search = mock.Mock(
|
mocked_imaplib_server.search = mock.Mock(
|
||||||
return_value=imap_mail_list)
|
return_value=imap_mail_list)
|
||||||
|
|
||||||
# we ignore the second arg as the data item/mime-part is
|
# we ignore the second arg as the data item/mime-part is
|
||||||
# constant (RFC822)
|
# constant (RFC822)
|
||||||
mocked_imaplib_server.fetch = mock.Mock(
|
mocked_imaplib_server.fetch = mock.Mock(
|
||||||
@ -867,7 +938,6 @@ class GetEmailParametricTemplate(object):
|
|||||||
def test_read_pgp_signed_email(self):
|
def test_read_pgp_signed_email(self):
|
||||||
"""Tests reading a PGP signed email to ensure we handle base64
|
"""Tests reading a PGP signed email to ensure we handle base64
|
||||||
and PGP signatures appropriately."""
|
and PGP signatures appropriately."""
|
||||||
|
|
||||||
# example email text from #567 on GitHub
|
# example email text from #567 on GitHub
|
||||||
with open(os.path.join(THIS_DIR, "test_files/pgp.eml"), encoding="utf-8") as fd:
|
with open(os.path.join(THIS_DIR, "test_files/pgp.eml"), encoding="utf-8") as fd:
|
||||||
test_email = fd.read()
|
test_email = fd.read()
|
||||||
@ -923,7 +993,6 @@ class GetEmailParametricTemplate(object):
|
|||||||
mocked_imaplib_server = mock.Mock()
|
mocked_imaplib_server = mock.Mock()
|
||||||
mocked_imaplib_server.search = mock.Mock(
|
mocked_imaplib_server.search = mock.Mock(
|
||||||
return_value=imap_mail_list)
|
return_value=imap_mail_list)
|
||||||
|
|
||||||
# we ignore the second arg as the data item/mime-part is
|
# we ignore the second arg as the data item/mime-part is
|
||||||
# constant (RFC822)
|
# constant (RFC822)
|
||||||
mocked_imaplib_server.fetch = mock.Mock(
|
mocked_imaplib_server.fetch = mock.Mock(
|
||||||
@ -944,7 +1013,6 @@ class GetEmailParametricTemplate(object):
|
|||||||
mocked_imaplib_server = mock.Mock()
|
mocked_imaplib_server = mock.Mock()
|
||||||
mocked_imaplib_server.search = mock.Mock(
|
mocked_imaplib_server.search = mock.Mock(
|
||||||
return_value=imap_mail_list)
|
return_value=imap_mail_list)
|
||||||
|
|
||||||
# we ignore the second arg as the data item/mime-part is
|
# we ignore the second arg as the data item/mime-part is
|
||||||
# constant (RFC822)
|
# constant (RFC822)
|
||||||
mocked_imaplib_server.fetch = mock.Mock(
|
mocked_imaplib_server.fetch = mock.Mock(
|
||||||
@ -1001,8 +1069,6 @@ a9eiiQ+3V1v+7wWHXCzq
|
|||||||
""")
|
""")
|
||||||
# should this be 'application/pgp-signature'?
|
# should this be 'application/pgp-signature'?
|
||||||
# self.assertEqual(attach1.mime_type, 'text/plain')
|
# self.assertEqual(attach1.mime_type, 'text/plain')
|
||||||
|
|
||||||
|
|
||||||
class GetEmailCCHandling(TestCase):
|
class GetEmailCCHandling(TestCase):
|
||||||
"""TestCase that checks CC handling in email. Needs its own test harness."""
|
"""TestCase that checks CC handling in email. Needs its own test harness."""
|
||||||
|
|
||||||
@ -1102,7 +1168,6 @@ class GetEmailCCHandling(TestCase):
|
|||||||
", " + test_email_cc_three + ", " + test_email_cc_four + ", " + ticket_user_emails + \
|
", " + test_email_cc_three + ", " + test_email_cc_four + ", " + ticket_user_emails + \
|
||||||
"\nFrom: " + test_email_from + "\nSubject: " + \
|
"\nFrom: " + test_email_from + "\nSubject: " + \
|
||||||
test_email_subject + "\n\n" + test_email_body
|
test_email_subject + "\n\n" + test_email_body
|
||||||
test_mail_len = len(test_email)
|
|
||||||
|
|
||||||
with mock.patch('os.listdir') as mocked_listdir, \
|
with mock.patch('os.listdir') as mocked_listdir, \
|
||||||
mock.patch('helpdesk.email.isfile') as mocked_isfile, \
|
mock.patch('helpdesk.email.isfile') as mocked_isfile, \
|
||||||
|
@ -58,12 +58,13 @@ class QueryTests(TestCase):
|
|||||||
query = query_to_base64({})
|
query = query_to_base64({})
|
||||||
response = self.client.get(
|
response = self.client.get(
|
||||||
reverse('helpdesk:datatables_ticket_list', args=[query]))
|
reverse('helpdesk:datatables_ticket_list', args=[query]))
|
||||||
|
resp_json = response.json()
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
response.json(),
|
resp_json,
|
||||||
{
|
{
|
||||||
"data":
|
"data":
|
||||||
[{"ticket": "1 [test_queue-1]", "id": 1, "priority": 3, "title": "unassigned to kbitem", "queue": {"title": "Test queue", "id": 1}, "status": "Open", "created": "now", "due_date": None, "assigned_to": "None", "submitter": None, "row_class": "", "time_spent": "", "kbitem": ""},
|
[{"ticket": "1 [test_queue-1]", "id": 1, "priority": 3, "title": "unassigned to kbitem", "queue": {"title": "Test queue", "id": 1}, "status": "Open", "created": resp_json["data"][0]["created"], "due_date": None, "assigned_to": "None", "submitter": None, "row_class": "", "time_spent": "", "kbitem": ""},
|
||||||
{"ticket": "2 [test_queue-2]", "id": 2, "priority": 3, "title": "assigned to kbitem", "queue": {"title": "Test queue", "id": 1}, "status": "Open", "created": "now", "due_date": None, "assigned_to": "None", "submitter": None, "row_class": "", "time_spent": "", "kbitem": "KBItem 1"}],
|
{"ticket": "2 [test_queue-2]", "id": 2, "priority": 3, "title": "assigned to kbitem", "queue": {"title": "Test queue", "id": 1}, "status": "Open", "created": resp_json["data"][1]["created"], "due_date": None, "assigned_to": "None", "submitter": None, "row_class": "", "time_spent": "", "kbitem": "KBItem 1"}],
|
||||||
"recordsFiltered": 2,
|
"recordsFiltered": 2,
|
||||||
"recordsTotal": 2,
|
"recordsTotal": 2,
|
||||||
"draw": 0,
|
"draw": 0,
|
||||||
@ -77,12 +78,13 @@ class QueryTests(TestCase):
|
|||||||
)
|
)
|
||||||
response = self.client.get(
|
response = self.client.get(
|
||||||
reverse('helpdesk:datatables_ticket_list', args=[query]))
|
reverse('helpdesk:datatables_ticket_list', args=[query]))
|
||||||
|
resp_json = response.json()
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
response.json(),
|
resp_json,
|
||||||
{
|
{
|
||||||
"data":
|
"data":
|
||||||
[{"ticket": "2 [test_queue-2]", "id": 2, "priority": 3, "title": "assigned to kbitem", "queue": {"title": "Test queue", "id": 1}, "status": "Open",
|
[{"ticket": "2 [test_queue-2]", "id": 2, "priority": 3, "title": "assigned to kbitem", "queue": {"title": "Test queue", "id": 1}, "status": "Open",
|
||||||
"created": "now", "due_date": None, "assigned_to": "None", "submitter": None, "row_class": "", "time_spent": "", "kbitem": "KBItem 1"}],
|
"created": resp_json["data"][0]["created"], "due_date": None, "assigned_to": "None", "submitter": None, "row_class": "", "time_spent": "", "kbitem": "KBItem 1"}],
|
||||||
"recordsFiltered": 1,
|
"recordsFiltered": 1,
|
||||||
"recordsTotal": 1,
|
"recordsTotal": 1,
|
||||||
"draw": 0,
|
"draw": 0,
|
||||||
@ -96,12 +98,13 @@ class QueryTests(TestCase):
|
|||||||
)
|
)
|
||||||
response = self.client.get(
|
response = self.client.get(
|
||||||
reverse('helpdesk:datatables_ticket_list', args=[query]))
|
reverse('helpdesk:datatables_ticket_list', args=[query]))
|
||||||
|
resp_json = response.json()
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
response.json(),
|
resp_json,
|
||||||
{
|
{
|
||||||
"data":
|
"data":
|
||||||
[{"ticket": "2 [test_queue-2]", "id": 2, "priority": 3, "title": "assigned to kbitem", "queue": {"title": "Test queue", "id": 1}, "status": "Open",
|
[{"ticket": "2 [test_queue-2]", "id": 2, "priority": 3, "title": "assigned to kbitem", "queue": {"title": "Test queue", "id": 1}, "status": "Open",
|
||||||
"created": "now", "due_date": None, "assigned_to": "None", "submitter": None, "row_class": "", "time_spent": "", "kbitem": "KBItem 1"}],
|
"created": resp_json["data"][0]["created"], "due_date": None, "assigned_to": "None", "submitter": None, "row_class": "", "time_spent": "", "kbitem": "KBItem 1"}],
|
||||||
"recordsFiltered": 1,
|
"recordsFiltered": 1,
|
||||||
"recordsTotal": 1,
|
"recordsTotal": 1,
|
||||||
"draw": 0,
|
"draw": 0,
|
||||||
|
@ -2,15 +2,12 @@
|
|||||||
|
|
||||||
from django.contrib.auth import get_user_model
|
from django.contrib.auth import get_user_model
|
||||||
from django.core import mail
|
from django.core import mail
|
||||||
from django.core.exceptions import ObjectDoesNotExist
|
|
||||||
from django.forms import ValidationError
|
|
||||||
from django.test import TestCase
|
from django.test import TestCase
|
||||||
from django.test.client import Client
|
from django.test.client import Client
|
||||||
from django.urls import reverse
|
from django.urls import reverse
|
||||||
import email
|
import email
|
||||||
from helpdesk.email import create_ticket_cc, object_from_message
|
from helpdesk.email import extract_email_metadata
|
||||||
from helpdesk.models import CustomField, FollowUp, KBCategory, KBItem, Queue, Ticket, TicketCC
|
from helpdesk.models import CustomField, FollowUp, KBCategory, KBItem, Queue, Ticket, TicketCC
|
||||||
from helpdesk.tests.helpers import print_response
|
|
||||||
import logging
|
import logging
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
import uuid
|
import uuid
|
||||||
@ -285,7 +282,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
|
|
||||||
email_count = len(mail.outbox)
|
email_count = len(mail.outbox)
|
||||||
|
|
||||||
object_from_message(str(msg), self.queue_public, logger=logger)
|
extract_email_metadata(str(msg), self.queue_public, logger=logger)
|
||||||
|
|
||||||
followup = FollowUp.objects.get(message_id=message_id)
|
followup = FollowUp.objects.get(message_id=message_id)
|
||||||
ticket = Ticket.objects.get(id=followup.ticket.id)
|
ticket = Ticket.objects.get(id=followup.ticket.id)
|
||||||
@ -317,7 +314,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
|
|
||||||
email_count = len(mail.outbox)
|
email_count = len(mail.outbox)
|
||||||
|
|
||||||
object_from_message(str(msg), self.queue_public, logger=logger)
|
extract_email_metadata(str(msg), self.queue_public, logger=logger)
|
||||||
|
|
||||||
ticket = Ticket.objects.get(
|
ticket = Ticket.objects.get(
|
||||||
title=self.ticket_data['title'], queue=self.queue_public, submitter_email=submitter_email)
|
title=self.ticket_data['title'], queue=self.queue_public, submitter_email=submitter_email)
|
||||||
@ -353,7 +350,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
|
|
||||||
email_count = len(mail.outbox)
|
email_count = len(mail.outbox)
|
||||||
|
|
||||||
object_from_message(str(msg), self.queue_public, logger=logger)
|
extract_email_metadata(str(msg), self.queue_public, logger=logger)
|
||||||
|
|
||||||
followup = FollowUp.objects.get(message_id=message_id)
|
followup = FollowUp.objects.get(message_id=message_id)
|
||||||
ticket = Ticket.objects.get(id=followup.ticket.id)
|
ticket = Ticket.objects.get(id=followup.ticket.id)
|
||||||
@ -400,7 +397,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
|
|
||||||
email_count = len(mail.outbox)
|
email_count = len(mail.outbox)
|
||||||
|
|
||||||
object_from_message(str(msg), self.queue_public, logger=logger)
|
extract_email_metadata(str(msg), self.queue_public, logger=logger)
|
||||||
|
|
||||||
followup = FollowUp.objects.get(message_id=message_id)
|
followup = FollowUp.objects.get(message_id=message_id)
|
||||||
ticket = Ticket.objects.get(id=followup.ticket.id)
|
ticket = Ticket.objects.get(id=followup.ticket.id)
|
||||||
@ -452,7 +449,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
|
|
||||||
email_count = len(mail.outbox)
|
email_count = len(mail.outbox)
|
||||||
|
|
||||||
object_from_message(str(msg), self.queue_public, logger=logger)
|
extract_email_metadata(str(msg), self.queue_public, logger=logger)
|
||||||
|
|
||||||
followup = FollowUp.objects.get(message_id=message_id)
|
followup = FollowUp.objects.get(message_id=message_id)
|
||||||
ticket = Ticket.objects.get(id=followup.ticket.id)
|
ticket = Ticket.objects.get(id=followup.ticket.id)
|
||||||
@ -498,7 +495,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
|
|
||||||
email_count = len(mail.outbox)
|
email_count = len(mail.outbox)
|
||||||
|
|
||||||
object_from_message(str(msg), self.queue_public, logger=logger)
|
extract_email_metadata(str(msg), self.queue_public, logger=logger)
|
||||||
|
|
||||||
followup = FollowUp.objects.get(message_id=message_id)
|
followup = FollowUp.objects.get(message_id=message_id)
|
||||||
ticket = Ticket.objects.get(id=followup.ticket.id)
|
ticket = Ticket.objects.get(id=followup.ticket.id)
|
||||||
@ -532,7 +529,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
reply.__setitem__('Content-Type', 'text/plain;')
|
reply.__setitem__('Content-Type', 'text/plain;')
|
||||||
reply.set_payload(self.ticket_data['description'])
|
reply.set_payload(self.ticket_data['description'])
|
||||||
|
|
||||||
object_from_message(str(reply), self.queue_public, logger=logger)
|
extract_email_metadata(str(reply), self.queue_public, logger=logger)
|
||||||
|
|
||||||
followup = FollowUp.objects.get(message_id=message_id)
|
followup = FollowUp.objects.get(message_id=message_id)
|
||||||
ticket = Ticket.objects.get(id=followup.ticket.id)
|
ticket = Ticket.objects.get(id=followup.ticket.id)
|
||||||
@ -588,7 +585,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
|
|
||||||
email_count = len(mail.outbox)
|
email_count = len(mail.outbox)
|
||||||
|
|
||||||
object_from_message(str(msg), self.queue_public, logger=logger)
|
extract_email_metadata(str(msg), self.queue_public, logger=logger)
|
||||||
|
|
||||||
followup = FollowUp.objects.get(message_id=message_id)
|
followup = FollowUp.objects.get(message_id=message_id)
|
||||||
ticket = Ticket.objects.get(id=followup.ticket.id)
|
ticket = Ticket.objects.get(id=followup.ticket.id)
|
||||||
@ -632,7 +629,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
reply.__setitem__('Content-Type', 'text/plain;')
|
reply.__setitem__('Content-Type', 'text/plain;')
|
||||||
reply.set_payload(self.ticket_data['description'])
|
reply.set_payload(self.ticket_data['description'])
|
||||||
|
|
||||||
object_from_message(str(reply), self.queue_public, logger=logger)
|
extract_email_metadata(str(reply), self.queue_public, logger=logger)
|
||||||
|
|
||||||
followup = FollowUp.objects.get(message_id=message_id)
|
followup = FollowUp.objects.get(message_id=message_id)
|
||||||
ticket = Ticket.objects.get(id=followup.ticket.id)
|
ticket = Ticket.objects.get(id=followup.ticket.id)
|
||||||
@ -686,7 +683,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
|
|
||||||
email_count = len(mail.outbox)
|
email_count = len(mail.outbox)
|
||||||
|
|
||||||
object_from_message(str(msg), self.queue_public, logger=logger)
|
extract_email_metadata(str(msg), self.queue_public, logger=logger)
|
||||||
|
|
||||||
followup = FollowUp.objects.get(message_id=message_id)
|
followup = FollowUp.objects.get(message_id=message_id)
|
||||||
ticket = Ticket.objects.get(id=followup.ticket.id)
|
ticket = Ticket.objects.get(id=followup.ticket.id)
|
||||||
@ -741,7 +738,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
|
|
||||||
email_count = len(mail.outbox)
|
email_count = len(mail.outbox)
|
||||||
|
|
||||||
object_from_message(str(reply), self.queue_public, logger=logger)
|
extract_email_metadata(str(reply), self.queue_public, logger=logger)
|
||||||
|
|
||||||
followup = FollowUp.objects.get(message_id=message_id)
|
followup = FollowUp.objects.get(message_id=message_id)
|
||||||
ticket = Ticket.objects.get(id=followup.ticket.id)
|
ticket = Ticket.objects.get(id=followup.ticket.id)
|
||||||
@ -783,7 +780,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
msg.set_payload(self.ticket_data['description'])
|
msg.set_payload(self.ticket_data['description'])
|
||||||
|
|
||||||
email_count = len(mail.outbox)
|
email_count = len(mail.outbox)
|
||||||
object_from_message(str(msg), self.queue_public, logger=logger)
|
extract_email_metadata(str(msg), self.queue_public, logger=logger)
|
||||||
|
|
||||||
followup = FollowUp.objects.get(message_id=message_id)
|
followup = FollowUp.objects.get(message_id=message_id)
|
||||||
ticket = Ticket.objects.get(id=followup.ticket.id)
|
ticket = Ticket.objects.get(id=followup.ticket.id)
|
||||||
@ -833,7 +830,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
|
|
||||||
email_count = len(mail.outbox)
|
email_count = len(mail.outbox)
|
||||||
|
|
||||||
object_from_message(
|
extract_email_metadata(
|
||||||
str(msg), self.queue_public_with_notifications_disabled, logger=logger)
|
str(msg), self.queue_public_with_notifications_disabled, logger=logger)
|
||||||
|
|
||||||
followup = FollowUp.objects.get(message_id=message_id)
|
followup = FollowUp.objects.get(message_id=message_id)
|
||||||
@ -880,7 +877,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
|
|
||||||
email_count = len(mail.outbox)
|
email_count = len(mail.outbox)
|
||||||
|
|
||||||
object_from_message(str(msg), self.queue_public, logger=logger)
|
extract_email_metadata(str(msg), self.queue_public, logger=logger)
|
||||||
|
|
||||||
followup = FollowUp.objects.get(message_id=message_id)
|
followup = FollowUp.objects.get(message_id=message_id)
|
||||||
ticket = Ticket.objects.get(id=followup.ticket.id)
|
ticket = Ticket.objects.get(id=followup.ticket.id)
|
||||||
@ -918,7 +915,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
reply.__setitem__('Content-Type', 'text/plain;')
|
reply.__setitem__('Content-Type', 'text/plain;')
|
||||||
reply.set_payload(self.ticket_data['description'])
|
reply.set_payload(self.ticket_data['description'])
|
||||||
|
|
||||||
object_from_message(str(reply), self.queue_public, logger=logger)
|
extract_email_metadata(str(reply), self.queue_public, logger=logger)
|
||||||
|
|
||||||
followup = FollowUp.objects.get(message_id=message_id)
|
followup = FollowUp.objects.get(message_id=message_id)
|
||||||
ticket = Ticket.objects.get(id=followup.ticket.id)
|
ticket = Ticket.objects.get(id=followup.ticket.id)
|
||||||
@ -964,7 +961,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
|
|
||||||
email_count = len(mail.outbox)
|
email_count = len(mail.outbox)
|
||||||
|
|
||||||
object_from_message(
|
extract_email_metadata(
|
||||||
str(msg), self.queue_public_with_notifications_disabled, logger=logger)
|
str(msg), self.queue_public_with_notifications_disabled, logger=logger)
|
||||||
|
|
||||||
followup = FollowUp.objects.get(message_id=message_id)
|
followup = FollowUp.objects.get(message_id=message_id)
|
||||||
@ -1003,7 +1000,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
reply.__setitem__('Content-Type', 'text/plain;')
|
reply.__setitem__('Content-Type', 'text/plain;')
|
||||||
reply.set_payload(self.ticket_data['description'])
|
reply.set_payload(self.ticket_data['description'])
|
||||||
|
|
||||||
object_from_message(
|
extract_email_metadata(
|
||||||
str(reply), self.queue_public_with_notifications_disabled, logger=logger)
|
str(reply), self.queue_public_with_notifications_disabled, logger=logger)
|
||||||
|
|
||||||
followup = FollowUp.objects.get(message_id=message_id)
|
followup = FollowUp.objects.get(message_id=message_id)
|
||||||
@ -1037,7 +1034,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
|
|
||||||
email_count = len(mail.outbox)
|
email_count = len(mail.outbox)
|
||||||
|
|
||||||
object_from_message(str(msg), self.queue_public, logger=logger)
|
extract_email_metadata(str(msg), self.queue_public, logger=logger)
|
||||||
|
|
||||||
followup = FollowUp.objects.get(message_id=message_id)
|
followup = FollowUp.objects.get(message_id=message_id)
|
||||||
ticket = Ticket.objects.get(id=followup.ticket.id)
|
ticket = Ticket.objects.get(id=followup.ticket.id)
|
||||||
@ -1059,7 +1056,7 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
reply.__setitem__('Content-Type', 'text/plain;')
|
reply.__setitem__('Content-Type', 'text/plain;')
|
||||||
reply.set_payload(self.ticket_data['description'])
|
reply.set_payload(self.ticket_data['description'])
|
||||||
|
|
||||||
object_from_message(str(reply), self.queue_public, logger=logger)
|
extract_email_metadata(str(reply), self.queue_public, logger=logger)
|
||||||
|
|
||||||
followup = FollowUp.objects.get(message_id=message_id)
|
followup = FollowUp.objects.get(message_id=message_id)
|
||||||
ticket = Ticket.objects.get(id=followup.ticket.id)
|
ticket = Ticket.objects.get(id=followup.ticket.id)
|
||||||
@ -1088,23 +1085,48 @@ class EmailInteractionsTestCase(TestCase):
|
|||||||
cat = KBCategory.objects.create(
|
cat = KBCategory.objects.create(
|
||||||
title="Test Cat",
|
title="Test Cat",
|
||||||
slug="test_cat",
|
slug="test_cat",
|
||||||
description="This is a test category",
|
description="This is a test category",
|
||||||
queue=self.queue_public,
|
queue=self.queue_public,
|
||||||
)
|
)
|
||||||
cat.save()
|
cat.save()
|
||||||
|
attr_list = {
|
||||||
|
"f1_field_title": "KBItem 1",
|
||||||
|
"f1_attr": "kbitem",
|
||||||
|
"f1_attr_value": "1",
|
||||||
|
"f2_attr": "submitter_email",
|
||||||
|
"f2_attr_value": "foo@bar.cz",
|
||||||
|
"f3_attr": "title",
|
||||||
|
"f3_attr_value": "lol",
|
||||||
|
}
|
||||||
self.kbitem1 = KBItem.objects.create(
|
self.kbitem1 = KBItem.objects.create(
|
||||||
category=cat,
|
category=cat,
|
||||||
title="KBItem 1",
|
title=attr_list["f1_field_title"],
|
||||||
question="What?",
|
question="What?",
|
||||||
answer="A KB Item",
|
answer="A KB Item",
|
||||||
)
|
)
|
||||||
self.kbitem1.save()
|
self.kbitem1.save()
|
||||||
cat_url = reverse('helpdesk:submit') + \
|
cat_url = reverse('helpdesk:submit') + '?' \
|
||||||
"?kbitem=1&submitter_email=foo@bar.cz&title=lol"
|
+ attr_list["f1_attr"] + '=' + attr_list["f1_attr_value"] + '&' \
|
||||||
|
+ attr_list["f2_attr"] + '=' + attr_list["f2_attr_value"] + '&' \
|
||||||
|
+ attr_list["f3_attr"] + '=' + attr_list["f3_attr_value"]
|
||||||
response = self.client.get(cat_url)
|
response = self.client.get(cat_url)
|
||||||
|
# Get the rendered response to make it easier to debug if things go wrong
|
||||||
|
if (
|
||||||
|
hasattr(response, "render")
|
||||||
|
and callable(response.render)
|
||||||
|
and not response.is_rendered
|
||||||
|
):
|
||||||
|
response.render()
|
||||||
|
if response.streaming:
|
||||||
|
content = b"".join(response.streaming_content)
|
||||||
|
else:
|
||||||
|
content = response.content
|
||||||
|
|
||||||
|
|
||||||
|
msg_prefix = content.decode(response.charset)
|
||||||
self.assertContains(
|
self.assertContains(
|
||||||
response, '<option value="1" selected>KBItem 1</option>')
|
response, '<option value="' + attr_list["f1_attr_value"] + '" selected>' + attr_list["f1_field_title"] + '</option>', msg_prefix = msg_prefix)
|
||||||
self.assertContains(
|
self.assertContains(
|
||||||
response, '<input type="email" name="submitter_email" value="foo@bar.cz" class="form-control form-control" required id="id_submitter_email">')
|
response, '<input type="email" name="' + attr_list["f2_attr"] + '" value="' + attr_list["f2_attr_value"], msg_prefix = msg_prefix)
|
||||||
self.assertContains(
|
self.assertContains(
|
||||||
response, '<input type="text" name="title" value="lol" class="form-control form-control" maxlength="100" required id="id_title">')
|
response, '<input type="text" name="' + attr_list["f3_attr"] + '" value="' + attr_list["f3_attr_value"], msg_prefix = msg_prefix)
|
||||||
|
@ -153,21 +153,21 @@ def generate_file_mime_part(locale: str="en_US",filename: str = None, content: s
|
|||||||
encoders.encode_base64(part)
|
encoders.encode_base64(part)
|
||||||
if not filename:
|
if not filename:
|
||||||
filename = get_fake("word", locale=locale, min_length=8) + ".txt"
|
filename = get_fake("word", locale=locale, min_length=8) + ".txt"
|
||||||
part.add_header('Content-Disposition', "attachment; filename= %s" % filename)
|
part.add_header('Content-Disposition', "attachment; filename=%s" % filename)
|
||||||
return part
|
return part
|
||||||
|
|
||||||
def generate_image_mime_part(locale: str="en_US",imagename: str = None) -> Message:
|
def generate_image_mime_part(locale: str="en_US",imagename: str = None, disposition_primary_type: str = "attachment") -> Message:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
:param locale: change this to generate locale specific file name and attachment content
|
:param locale: change this to generate locale specific file name and attachment content
|
||||||
:param filename: pass a file name if you want to specify a specific name otherwise a random name will be generated
|
:param filename: pass a file name if you want to specify a specific name otherwise a random name will be generated
|
||||||
"""
|
"""
|
||||||
part = MIMEImage(generate_random_image(image_format="JPEG", array_dims=(200, 200)))
|
part = MIMEImage(generate_random_image(image_format="JPEG", array_dims=(200, 200)))
|
||||||
part.set_payload(get_fake("text", locale=locale, min_length=1024))
|
#part.set_payload(get_fake("text", locale=locale, min_length=1024))
|
||||||
encoders.encode_base64(part)
|
encoders.encode_base64(part)
|
||||||
if not imagename:
|
if not imagename:
|
||||||
imagename = get_fake("word", locale=locale, min_length=8) + ".jpg"
|
imagename = get_fake("word", locale=locale, min_length=8) + ".jpg"
|
||||||
part.add_header('Content-Disposition', "attachment; filename= %s" % imagename)
|
part.add_header('Content-Disposition', disposition_primary_type + "; filename= %s" % imagename)
|
||||||
return part
|
return part
|
||||||
|
|
||||||
def generate_email_list(address_cnt: int = 3,
|
def generate_email_list(address_cnt: int = 3,
|
||||||
@ -215,10 +215,10 @@ def generate_mime_part(locale: str="en_US",
|
|||||||
"""
|
"""
|
||||||
if "plain" == part_type:
|
if "plain" == part_type:
|
||||||
body = get_fake("text", locale=locale, min_length=1024)
|
body = get_fake("text", locale=locale, min_length=1024)
|
||||||
msg = MIMEText(body)
|
msg = MIMEText(body, part_type)
|
||||||
elif "html" == part_type:
|
elif "html" == part_type:
|
||||||
body = get_fake_html(locale=locale, wrap_in_body_tag=True)
|
body = get_fake_html(locale=locale, wrap_in_body_tag=True)
|
||||||
msg = MIMEText(body)
|
msg = MIMEText(body, part_type)
|
||||||
elif "file" == part_type:
|
elif "file" == part_type:
|
||||||
msg = generate_file_mime_part(locale=locale)
|
msg = generate_file_mime_part(locale=locale)
|
||||||
elif "image" == part_type:
|
elif "image" == part_type:
|
||||||
@ -229,6 +229,7 @@ def generate_mime_part(locale: str="en_US",
|
|||||||
|
|
||||||
def generate_multipart_email(locale: str="en_US",
|
def generate_multipart_email(locale: str="en_US",
|
||||||
type_list: typing.List[str]=["plain", "html", "image"],
|
type_list: typing.List[str]=["plain", "html", "image"],
|
||||||
|
sub_type: str = None,
|
||||||
use_short_email: bool=False
|
use_short_email: bool=False
|
||||||
) -> typing.Tuple[Message, typing.Tuple[str, str], typing.Tuple[str, str]]:
|
) -> typing.Tuple[Message, typing.Tuple[str, str], typing.Tuple[str, str]]:
|
||||||
"""
|
"""
|
||||||
@ -236,9 +237,10 @@ def generate_multipart_email(locale: str="en_US",
|
|||||||
|
|
||||||
:param locale:
|
:param locale:
|
||||||
:param type_list: options are plain, html, image (attachment), file (attachment)
|
:param type_list: options are plain, html, image (attachment), file (attachment)
|
||||||
|
:param sub_type: multipart sub type that defaults to "mixed" if not specified
|
||||||
:param use_short_email: produces a "To" or "From" that is only the email address if True
|
:param use_short_email: produces a "To" or "From" that is only the email address if True
|
||||||
"""
|
"""
|
||||||
msg = MIMEMultipart()
|
msg = MIMEMultipart(sub_type) if sub_type else MIMEMultipart()
|
||||||
for part_type in type_list:
|
for part_type in type_list:
|
||||||
msg.attach(generate_mime_part(locale=locale, part_type=part_type))
|
msg.attach(generate_mime_part(locale=locale, part_type=part_type))
|
||||||
from_meta, to_meta = add_simple_email_headers(msg, locale=locale, use_short_email=use_short_email)
|
from_meta, to_meta = add_simple_email_headers(msg, locale=locale, use_short_email=use_short_email)
|
||||||
|
@ -7,7 +7,6 @@ akismet
|
|||||||
markdown
|
markdown
|
||||||
beautifulsoup4
|
beautifulsoup4
|
||||||
lxml
|
lxml
|
||||||
simplejson
|
|
||||||
pytz
|
pytz
|
||||||
six
|
six
|
||||||
djangorestframework
|
djangorestframework
|
||||||
|
@ -6,7 +6,6 @@ akismet
|
|||||||
markdown
|
markdown
|
||||||
beautifulsoup4
|
beautifulsoup4
|
||||||
lxml
|
lxml
|
||||||
simplejson
|
|
||||||
pytz
|
pytz
|
||||||
pinax_teams
|
pinax_teams
|
||||||
djangorestframework
|
djangorestframework
|
||||||
|
Loading…
Reference in New Issue
Block a user