refactor all handling of attached files

Extract attachment processing from forms, views.staff, and management.command.get_email modules, and consolidate it into a unified lib module function.
Also refactor the affected components, most notably lib.send_templated_email, to make it easier (IMO) to reason about changes to them.
Add unit tests for attachments with UTF-8 filenames, and functional tests for submission of same, as well as ASCII versions, through the public ticket-form.
Remove unused Attachment method "get_upload_to".
This commit is contained in:
Jonathan Barratt 2016-11-10 23:23:16 +07:00
parent 142c2367d2
commit 5acd891c68
No known key found for this signature in database
GPG Key ID: C007F833B47313DA
7 changed files with 265 additions and 221 deletions

View File

@ -17,7 +17,7 @@ from django.utils.translation import ugettext_lazy as _
from django.contrib.auth import get_user_model
from django.utils import timezone
from helpdesk.lib import send_templated_mail, safe_template_context
from helpdesk.lib import send_templated_mail, safe_template_context, process_attachments
from helpdesk.models import (Ticket, Queue, FollowUp, Attachment, IgnoreEmail, TicketCC,
CustomField, TicketCustomFieldValue, TicketDependency)
from helpdesk import settings as helpdesk_settings
@ -228,28 +228,10 @@ class AbstractTicketForm(CustomFieldMixin, forms.Form):
return followup
def _attach_files_to_follow_up(self, followup):
attachments = []
if self.cleaned_data['attachment']:
import mimetypes
attachment = self.cleaned_data['attachment']
filename = attachment.name.replace(' ', '_')
att = Attachment(
followup=followup,
filename=filename,
mime_type=mimetypes.guess_type(filename)[0] or 'application/octet-stream',
size=attachment.size,
)
att.file.save(attachment.name, attachment, save=False)
att.save()
if attachment.size < getattr(settings, 'MAX_EMAIL_ATTACHMENT_SIZE', 512000):
# Only files smaller than 512kb (or as defined in
# settings.MAX_EMAIL_ATTACHMENT_SIZE) are sent via email.
try:
attachments.append([att.filename, att.file])
except NotImplementedError:
pass
return attachments
files = self.cleaned_data['attachment']
if files:
files = process_attachments(followup, [files])
return files
@staticmethod
def _send_messages(ticket, queue, followup, files, user=None):

View File

@ -7,6 +7,8 @@ lib.py - Common functions (eg multipart e-mail)
"""
import logging
import mimetypes
import os
try:
from base64 import urlsafe_b64encode as b64encode
@ -17,22 +19,25 @@ try:
except ImportError:
from base64 import decodestring as b64decode
from django.utils.encoding import smart_str
from django.conf import settings
from django.db.models import Q
from django.utils.encoding import smart_text
from django.utils.safestring import mark_safe
from helpdesk.models import Attachment, EmailTemplate
logger = logging.getLogger('helpdesk')
def send_templated_mail(template_name,
email_context,
context,
recipients,
sender=None,
bcc=None,
fail_silently=False,
files=None):
"""
send_templated_mail() is a warpper around Django's e-mail routines that
send_templated_mail() is a wrapper around Django's e-mail routines that
allows us to easily send multipart (text/plain & text/html) e-mails using
templates that are stored in the database. This lets the admin provide
both a text and a HTML template for each message.
@ -40,7 +45,7 @@ def send_templated_mail(template_name,
template_name is the slug of the template to use for this message (see
models.EmailTemplate)
email_context is a dictionary to be used when rendering the template
context is a dictionary to be used when rendering the template
recipients can be either a string, eg 'a@b.com', or a list of strings.
@ -53,68 +58,52 @@ def send_templated_mail(template_name,
fail_silently is passed to Django's mail routine. Set to 'True' to ignore
any errors at send time.
files can be a list of tuple. Each tuple should be a filename to attach,
files can be a list of tuples. Each tuple should be a filename to attach,
along with the File objects to be read. files can be blank.
"""
from django.conf import settings
from django.core.mail import EmailMultiAlternatives
from django.template import engines
from_string = engines['django'].from_string
from helpdesk.models import EmailTemplate
from helpdesk.settings import HELPDESK_EMAIL_SUBJECT_TEMPLATE, \
HELPDESK_EMAIL_FALLBACK_LOCALE
import os
context = email_context
locale = context['queue'].get('locale') or HELPDESK_EMAIL_FALLBACK_LOCALE
if hasattr(context['queue'], 'locale'):
locale = getattr(context['queue'], 'locale', '')
else:
locale = context['queue'].get('locale', HELPDESK_EMAIL_FALLBACK_LOCALE)
if not locale:
locale = HELPDESK_EMAIL_FALLBACK_LOCALE
t = None
try:
t = EmailTemplate.objects.get(template_name__iexact=template_name, locale=locale)
except EmailTemplate.DoesNotExist:
pass
if not t:
try:
t = EmailTemplate.objects.get(template_name__iexact=template_name, locale__isnull=True)
except EmailTemplate.DoesNotExist:
logger.warning('template "%s" does not exist, no mail sent',
template_name)
logger.warning('template "%s" does not exist, no mail sent', template_name)
return # just ignore if template doesn't exist
if not sender:
sender = settings.DEFAULT_FROM_EMAIL
subject_part = from_string(
HELPDESK_EMAIL_SUBJECT_TEMPLATE % {"subject": t.subject}
).render(context).replace('\n', '').replace('\r', '')
footer_file = os.path.join('helpdesk', locale, 'email_text_footer.txt')
from django.template import engines
template_func = engines['django'].from_string
text_part = template_func(
text_part = from_string(
"%s{%% include '%s' %%}" % (t.plain_text, footer_file)
).render(context)
email_html_base_file = os.path.join('helpdesk', locale, 'email_html_base.html')
# keep new lines in html emails
if 'comment' in context:
html_txt = context['comment']
html_txt = html_txt.replace('\r\n', '<br>')
context['comment'] = mark_safe(html_txt)
context['comment'] = mark_safe(context['comment'].replace('\r\n', '<br>'))
html_part = template_func(
html_part = from_string(
"{%% extends '%s' %%}{%% block title %%}"
"%s"
"{%% endblock %%}{%% block content %%}%s{%% endblock %%}" %
(email_html_base_file, t.heading, t.html)).render(context)
(email_html_base_file, t.heading, t.html)
).render(context)
subject_part = template_func(
subject_part = from_string(
HELPDESK_EMAIL_SUBJECT_TEMPLATE % {
"subject": t.subject,
}).render(context)
@ -123,19 +112,16 @@ def send_templated_mail(template_name,
if recipients.find(','):
recipients = recipients.split(',')
elif type(recipients) != list:
recipients = [recipients, ]
recipients = [recipients]
msg = EmailMultiAlternatives(
subject_part.replace('\n', '').replace('\r', ''),
text_part, sender, recipients, bcc=bcc)
msg = EmailMultiAlternatives(subject_part, text_part,
sender or settings.DEFAULT_FROM_EMAIL,
recipients, bcc=bcc)
msg.attach_alternative(html_part, "text/html")
if files:
for attachment in files:
file_to_attach = attachment[1]
file_to_attach.open()
msg.attach(filename=attachment[0], content=file_to_attach.read())
file_to_attach.close()
for filename, filefield in files:
msg.attach(filename, open(filefield.path).read())
return msg.send(fail_silently)
@ -254,7 +240,6 @@ def text_is_spam(text, request):
# False if it is not spam. If it cannot be checked for some reason, we
# assume it isn't spam.
from django.contrib.sites.models import Site
from django.conf import settings
try:
from helpdesk.akismet import Akismet
except:
@ -286,6 +271,32 @@ def text_is_spam(text, request):
'comment_author': '',
}
return ak.comment_check(smart_str(text), data=ak_data)
return ak.comment_check(smart_text(text), data=ak_data)
return False
def process_attachments(followup, attached_files):
max_email_attachment_size = getattr(settings, 'MAX_EMAIL_ATTACHMENT_SIZE', 512000)
attachments = []
for attached in attached_files:
if attached.size:
filename = smart_text(attached.name)
att = Attachment(
followup=followup,
file=attached,
filename=filename,
mime_type=attached.content_type or
mimetypes.guess_type(filename, strict=False)[0] or
'application/octet-stream',
size=attached.size,
)
att.save()
if attached.size < max_email_attachment_size:
# Only files smaller than 512kb (or as defined in
# settings.MAX_EMAIL_ATTACHMENT_SIZE) are sent via email.
attachments.append([filename, att.file])
return attachments

View File

@ -11,34 +11,31 @@ scripts/get_email.py - Designed to be run from cron, this script checks the
adding to existing tickets if needed)
"""
from datetime import timedelta
import email
import imaplib
import mimetypes
from os import listdir, unlink
from os.path import isfile, join
import poplib
import re
import socket
from os import listdir, unlink
from os.path import isfile, join
from datetime import timedelta
from email.header import decode_header
from email.utils import parseaddr, collapse_rfc2231_value
from optparse import make_option
from time import ctime
from email_reply_parser import EmailReplyParser
from django.core.files.base import ContentFile
from django.core.files.uploadedfile import SimpleUploadedFile
from django.core.management.base import BaseCommand
from django.db.models import Q
from django.utils.translation import ugettext as _
from django.utils import six, timezone
from django.utils import encoding, six, timezone
from helpdesk import settings
from helpdesk.lib import send_templated_mail, safe_template_context
from helpdesk.models import Queue, Ticket, FollowUp, Attachment, IgnoreEmail
from helpdesk.lib import send_templated_mail, safe_template_context, process_attachments
from helpdesk.models import Queue, Ticket, FollowUp, IgnoreEmail
import logging
from time import ctime
STRIPPED_SUBJECT_STRINGS = [
@ -93,22 +90,16 @@ def process_email(quiet=False):
if quiet:
logger.propagate = False # do not propagate to root logger that would log to console
logdir = q.logging_dir or '/var/log/helpdesk/'
handler = logging.FileHandler(logdir + q.slug + '_get_email.log')
handler = logging.FileHandler(join(logdir, q.slug + '_get_email.log'))
logger.addHandler(handler)
if not q.email_box_last_check:
q.email_box_last_check = timezone.now() - timedelta(minutes=30)
if not q.email_box_interval:
q.email_box_interval = 0
queue_time_delta = timedelta(minutes=q.email_box_interval)
if (q.email_box_last_check + queue_time_delta) > timezone.now():
continue
queue_time_delta = timedelta(minutes=q.email_box_interval or 0)
if (q.email_box_last_check + queue_time_delta) < timezone.now():
process_queue(q, logger=logger)
q.email_box_last_check = timezone.now()
q.save()
@ -165,20 +156,20 @@ def process_queue(q, logger):
server.pass_(q.email_box_pass or settings.QUEUE_EMAIL_BOX_PASSWORD)
messagesInfo = server.list()[1]
logger.info("Received %s messages from POP3 server" % str(len(messagesInfo)))
logger.info("Received %d messages from POP3 server" % len(messagesInfo))
for msg in messagesInfo:
msgNum = msg.split(" ")[0]
logger.info("Processing message %s" % str(msgNum))
logger.info("Processing message %s" % msgNum)
full_message = "\n".join(server.retr(msgNum)[1])
ticket = ticket_from_message(message=full_message, queue=q, logger=logger)
if ticket:
server.dele(msgNum)
logger.info("Successfully processed message %s, deleted from POP3 server" % str(msgNum))
logger.info("Successfully processed message %s, deleted from POP3 server" % msgNum)
else:
logger.warn("Message %s was not successfully processed, and will be left on POP3 server" % str(msgNum))
logger.warn("Message %s was not successfully processed, and will be left on POP3 server" % msgNum)
server.quit()
@ -207,16 +198,16 @@ def process_queue(q, logger):
status, data = server.search(None, 'NOT', 'DELETED')
if data:
msgnums = data[0].split()
logger.info("Received %s messages from IMAP server" % str(len(msgnums)))
logger.info("Received %d messages from IMAP server" % len(msgnums))
for num in msgnums:
logger.info("Processing message %s" % str(num))
logger.info("Processing message %s" % num)
status, data = server.fetch(num, '(RFC822)')
ticket = ticket_from_message(message=data[0][1], queue=q, logger=logger)
ticket = ticket_from_message(message=encoding.smart_text(data[0][1]), queue=q, logger=logger)
if ticket:
server.store(num, '+FLAGS', '\\Deleted')
logger.info("Successfully processed message %s, deleted from IMAP server" % str(msgNum))
logger.info("Successfully processed message %s, deleted from IMAP server" % num)
else:
logger.warn("Message %s was not successfully processed, and will be left on IMAP server" % str(msgNum))
logger.warn("Message %s was not successfully processed, and will be left on IMAP server" % num)
server.expunge()
server.close()
@ -225,20 +216,23 @@ def process_queue(q, logger):
elif email_box_type == 'local':
mail_dir = q.email_box_local_dir or '/var/lib/mail/helpdesk/'
mail = [join(mail_dir, f) for f in listdir(mail_dir) if isfile(join(mail_dir, f))]
logger.info("Found %s messages in local mailbox directory" % str(len(mail)))
for m in mail:
logger.info("Processing message %s" % str(m))
logger.info("Found %d messages in local mailbox directory" % len(mail))
logger.info("Found %d messages in local mailbox directory" % len(mail))
for i, m in enumerate(mail, 1):
logger.info("Processing message %d" % i)
with open(m, 'r') as f:
ticket = ticket_from_message(message=f.read(), queue=q, logger=logger)
if ticket:
logger.info("Successfully processed message %s, ticket/comment created." % str(m))
logger.info("Successfully processed message %d, ticket/comment created." % i)
try:
unlink(m) # delete message file if ticket was successful
logger.info("Successfully deleted message %s." % str(m))
except:
logger.error("Unable to delete message %s." % str(m))
logger.error("Unable to delete message %d." % i)
else:
logger.warn("Message %s was not successfully processed, and will be left in local directory" % str(m))
logger.info("Successfully deleted message %d." % i)
else:
logger.warn("Message %d was not successfully processed, and will be left in local directory" % i)
def decodeUnknown(charset, string):
@ -256,12 +250,12 @@ def decodeUnknown(charset, string):
return str(string, encoding='utf-8', errors='replace')
except:
return str(string, encoding='iso8859-1', errors='replace')
return str(string, encoding=charset)
return str(string, encoding=charset, errors='replace')
return string
def decode_mail_headers(string):
decoded = decode_header(string)
decoded = email.header.decode_header(string)
if six.PY2:
return u' '.join([unicode(msg, charset or 'utf-8') for msg, charset in decoded])
elif six.PY3:
@ -270,8 +264,7 @@ def decode_mail_headers(string):
def ticket_from_message(message, queue, logger):
# 'message' must be an RFC822 formatted message.
msg = message
message = email.message_from_string(msg)
message = email.message_from_string(message)
subject = message.get('subject', _('Created from e-mail'))
subject = decode_mail_headers(decodeUnknown(message.get_charset(), subject))
for affix in STRIPPED_SUBJECT_STRINGS:
@ -280,10 +273,7 @@ def ticket_from_message(message, queue, logger):
sender = message.get('from', _('Unknown Sender'))
sender = decode_mail_headers(decodeUnknown(message.get_charset(), sender))
sender_email = parseaddr(sender)[1]
body_plain, body_html = '', ''
sender_email = email.utils.parseaddr(sender)[1]
for ignore in IgnoreEmail.objects.filter(Q(queues=queue) | Q(queues__isnull=True)):
if ignore.test(sender_email):
@ -302,6 +292,7 @@ def ticket_from_message(message, queue, logger):
logger.info("No tracking ID matched.")
ticket = None
body = None
counter = 0
files = []
@ -311,80 +302,61 @@ def ticket_from_message(message, queue, logger):
name = part.get_param("name")
if name:
name = collapse_rfc2231_value(name)
name = email.utils.collapse_rfc2231_value(name)
if part.get_content_maintype() == 'text' and name is None:
if part.get_content_subtype() == 'plain':
body_plain = EmailReplyParser.parse_reply(
decodeUnknown(part.get_content_charset(), part.get_payload(decode=True)))
body = EmailReplyParser.parse_reply(
decodeUnknown(part.get_content_charset(), part.get_payload(decode=True))
)
logger.debug("Discovered plain text MIME part")
else:
body_html = part.get_payload(decode=True)
files.append(
SimpleUploadedFile(_("email_html_body.html"), encoding.smart_bytes(part.get_payload()), 'text/html')
)
logger.debug("Discovered HTML MIME part")
else:
if not name:
ext = mimetypes.guess_extension(part.get_content_type())
name = "part-%i%s" % (counter, ext)
files.append({
'filename': name,
'content': part.get_payload(decode=True),
'type': part.get_content_type()},
)
files.append(SimpleUploadedFile(name, encoding.smart_bytes(part.get_payload()), part.get_content_type()))
logger.debug("Found MIME attachment %s" % name)
counter += 1
if body_plain:
body = body_plain
else:
if not body:
body = _('No plain-text email body available. Please see attachment "email_html_body.html".')
if body_html:
files.append({
'filename': _("email_html_body.html"),
'content': body_html,
'type': 'text/html',
})
now = timezone.now()
if ticket:
try:
t = Ticket.objects.get(id=ticket)
new = False
logger.info("Found existing ticket with Tracking ID %s-%s" % (t.queue.slug, t.id))
except Ticket.DoesNotExist:
logger.info("Tracking ID %s-%s not associated with existing ticket. Creating new ticket." % (queue.slug, ticket))
ticket = None
priority = 3
else:
logger.info("Found existing ticket with Tracking ID %s-%s" % (t.queue.slug, t.id))
if t.status == Ticket.CLOSED_STATUS:
t.status = Ticket.REOPENED_STATUS
t.save()
new = False
smtp_priority = message.get('priority', '')
smtp_importance = message.get('importance', '')
high_priority_types = ('high', 'important', '1', 'urgent')
if smtp_priority in high_priority_types or smtp_importance in high_priority_types:
priority = 2
high_priority_types = {'high', 'important', '1', 'urgent'}
priority = 2 if high_priority_types & {smtp_priority, smtp_importance} else 3
if ticket is None:
t = Ticket(
new = True
t = Ticket.objects.create(
title=subject,
queue=queue,
submitter_email=sender_email,
created=now,
created=timezone.now(),
description=body,
priority=priority,
)
t.save()
new = True
logger.debug("Created new ticket %s-%s" % (t.queue.slug, t.id))
elif t.status == Ticket.CLOSED_STATUS:
t.status = Ticket.REOPENED_STATUS
t.save()
f = FollowUp(
ticket=t,
title=_('E-Mail Received from %(sender_email)s' % {'sender_email': sender_email}),
@ -405,28 +377,13 @@ def ticket_from_message(message, queue, logger):
elif six.PY3:
logger.info("[%s-%s] %s" % (t.queue.slug, t.id, t.title,))
for file in files:
if file['content']:
if six.PY2:
filename = file['filename'].encode('ascii', 'replace').replace(' ', '_')
elif six.PY3:
filename = file['filename'].replace(' ', '_')
filename = re.sub('[^a-zA-Z0-9._-]+', '', filename)
logger.info("Found attachment '%s'" % filename)
a = Attachment(
followup=f,
filename=filename,
mime_type=file['type'],
size=len(file['content']),
)
a.file.save(filename, ContentFile(file['content']), save=False)
a.save()
logger.info("Attachment '%s' successfully added to ticket." % filename)
attached = process_attachments(f, files)
for att_file in attached:
logger.info("Attachment '%s' successfully added to ticket from email." % att_file[0])
context = safe_template_context(t)
if new:
if sender_email:
send_templated_mail(
'newticket_submitter',
@ -435,7 +392,6 @@ def ticket_from_message(message, queue, logger):
sender=queue.from_address,
fail_silently=True,
)
if queue.new_ticket_cc:
send_templated_mail(
'newticket_cc',
@ -444,7 +400,6 @@ def ticket_from_message(message, queue, logger):
sender=queue.from_address,
fail_silently=True,
)
if queue.updated_ticket_cc and queue.updated_ticket_cc != queue.new_ticket_cc:
send_templated_mail(
'newticket_cc',
@ -453,15 +408,8 @@ def ticket_from_message(message, queue, logger):
sender=queue.from_address,
fail_silently=True,
)
else:
context.update(comment=f.comment)
# if t.status == Ticket.REOPENED_STATUS:
# update = _(' (Reopened)')
# else:
# update = _(' (Updated)')
if t.assigned_to:
send_templated_mail(
'updated_owner',
@ -470,7 +418,6 @@ def ticket_from_message(message, queue, logger):
sender=queue.from_address,
fail_silently=True,
)
if queue.updated_ticket_cc:
send_templated_mail(
'updated_cc',

View File

@ -741,7 +741,6 @@ def attachment_path(instance, filename):
putting attachments in a folder off attachments for ticket/followup_id/.
"""
import os
from django.conf import settings
os.umask(0)
path = 'helpdesk/attachments/%s/%s' % (instance.followup.ticket.ticket_for_url, instance.followup.id)
att_path = os.path.join(settings.MEDIA_ROOT, path)
@ -784,15 +783,6 @@ class Attachment(models.Model):
help_text=_('Size of this file in bytes'),
)
def get_upload_to(self, field_attname):
""" Get upload_to path specific to this item """
if not self.id:
return u''
return u'helpdesk/attachments/%s/%s' % (
self.followup.ticket.ticket_for_url,
self.followup.id
)
def __str__(self):
return '%s' % self.filename

View File

@ -0,0 +1,135 @@
# vim: set fileencoding=utf-8 :
from __future__ import unicode_literals
import os
import shutil
from tempfile import gettempdir
from django.core.files.uploadedfile import SimpleUploadedFile
from django.core.urlresolvers import reverse
from django.test import override_settings, TestCase
from django.utils.encoding import smart_text
from helpdesk import lib, models
try:
# Python >= 3.3
from unittest import mock
except ImportError:
# Python < 3.3
import mock
MEDIA_DIR = os.path.join(gettempdir(), 'helpdesk_test_media')
@override_settings(MEDIA_ROOT=MEDIA_DIR)
class AttachmentIntegrationTests(TestCase):
fixtures = ['emailtemplate.json']
def setUp(self):
self.queue_public = models.Queue.objects.create(
title='Public Queue',
slug='pub_q',
allow_public_submission=True,
new_ticket_cc='new.public@example.com',
updated_ticket_cc='update.public@example.com',
)
self.queue_private = models.Queue.objects.create(
title='Private Queue',
slug='priv_q',
allow_public_submission=False,
new_ticket_cc='new.private@example.com',
updated_ticket_cc='update.private@example.com',
)
self.ticket_data = {
'title': 'Test Ticket Title',
'body': 'Test Ticket Desc',
'priority': 3,
'submitter_email': 'submitter@example.com',
}
def test_create_pub_ticket_with_attachment(self):
test_file = SimpleUploadedFile('test_att.txt', b'attached file content', 'text/plain')
post_data = self.ticket_data.copy()
post_data.update({
'queue': self.queue_public.id,
'attachment': test_file,
})
# Ensure ticket form submits with attachment successfully
response = self.client.post(reverse('helpdesk:home'), post_data, follow=True)
self.assertContains(response, test_file.name)
# Ensure attachment is available with correct content
att = models.Attachment.objects.get(followup__ticket=response.context['ticket'])
with open(os.path.join(MEDIA_DIR, att.file.name)) as file_on_disk:
disk_content = file_on_disk.read()
self.assertEqual(disk_content, 'attached file content')
def test_create_pub_ticket_with_attachment_utf8(self):
test_file = SimpleUploadedFile('ß°äöü.txt', 'โจ'.encode('utf-8'), 'text/utf-8')
post_data = self.ticket_data.copy()
post_data.update({
'queue': self.queue_public.id,
'attachment': test_file,
})
# Ensure ticket form submits with attachment successfully
response = self.client.post(reverse('helpdesk:home'), post_data, follow=True)
self.assertContains(response, test_file.name)
# Ensure attachment is available with correct content
att = models.Attachment.objects.get(followup__ticket=response.context['ticket'])
with open(os.path.join(MEDIA_DIR, att.file.name)) as file_on_disk:
disk_content = smart_text(file_on_disk.read(), 'utf-8')
self.assertEqual(disk_content, 'โจ')
@mock.patch.object(models.FollowUp, 'save', autospec=True)
@mock.patch.object(models.Ticket, 'save', autospec=True)
@mock.patch.object(models.Queue, 'save', autospec=True)
class AttachmentUnitTests(TestCase):
def setUp(self):
self.file_attrs = {
'filename': '°ßäöü.txt',
'content': 'โจ'.encode('utf-8'),
'content-type': 'text/utf8',
}
self.test_file = SimpleUploadedFile.from_dict(self.file_attrs)
self.follow_up = models.FollowUp(ticket=models.Ticket(queue=models.Queue()))
@mock.patch('helpdesk.lib.Attachment', autospec=True)
def test_unicode_attachment_filename(self, mock_att_save, mock_queue_save, mock_ticket_save, mock_follow_up_save):
""" check utf-8 data is parsed correcltly """
filename, fileobj = lib.process_attachments(self.follow_up, [self.test_file])[0]
mock_att_save.assert_called_with(
file=self.test_file,
filename=self.file_attrs['filename'],
mime_type=self.file_attrs['content-type'],
size=len(self.file_attrs['content']),
followup=self.follow_up
)
self.assertEqual(filename, self.file_attrs['filename'])
@mock.patch.object(lib.Attachment, 'save', autospec=True)
@override_settings(MEDIA_ROOT=MEDIA_DIR)
def test_unicode_filename_to_filesystem(self, mock_att_save, mock_queue_save, mock_ticket_save, mock_follow_up_save):
""" don't mock saving to filesystem to test file renames caused by storage layer """
filename, fileobj = lib.process_attachments(self.follow_up, [self.test_file])[0]
# Attachment object was zeroth positional arg (i.e. self) of att.save call
attachment_obj = mock_att_save.call_args[0][0]
mock_att_save.assert_called_once_with(attachment_obj)
self.assertIsInstance(attachment_obj, models.Attachment)
self.assertEqual(attachment_obj.filename, self.file_attrs['filename'])
def tearDownModule():
try:
shutil.rmtree(MEDIA_DIR)
except OSError:
pass

View File

@ -126,11 +126,7 @@ class GetEmailParametricTemplate(object):
mocked_imaplib_server.fetch = mock.Mock(side_effect=lambda x, _: imap_emails[x])
with mock.patch('helpdesk.management.commands.get_email.imaplib', autospec=True) as mocked_imaplib:
mocked_imaplib.IMAP4 = mock.Mock(return_value=mocked_imaplib_server)
try:
call_command('get_email')
except UnboundLocalError:
# known bug fixed by a subsequent commit
return True
ticket1 = get_object_or_404(Ticket, pk=1)
self.assertEqual(ticket1.ticket_for_url, "QQ-%s" % ticket1.id)
@ -140,6 +136,7 @@ class GetEmailParametricTemplate(object):
self.assertEqual(ticket2.ticket_for_url, "QQ-%s" % ticket2.id)
self.assertEqual(ticket2.description, "This is the helpdesk comment via email.")
# build matrix of test cases
case_methods = [c[0] for c in Queue._meta.get_field('email_box_type').choices]
case_socks = [False] + [c[0] for c in Queue._meta.get_field('socks_proxy_type').choices]

View File

@ -31,6 +31,7 @@ from helpdesk.forms import (
)
from helpdesk.lib import (
send_templated_mail, query_to_dict, apply_query, safe_template_context,
process_attachments,
)
from helpdesk.models import (
Ticket, Queue, FollowUp, TicketChange, PreSetReply, Attachment, SavedSearch,
@ -458,26 +459,7 @@ def update_ticket(request, ticket_id, public=False):
f.save()
files = []
if request.FILES:
import mimetypes
for file in request.FILES.getlist('attachment'):
filename = file.name.encode('ascii', 'ignore')
filename = filename.decode("utf-8")
print(filename)
a = Attachment(
followup=f,
filename=filename,
mime_type=file.content_type or 'application/octet-stream',
size=file.size,
)
a.file.save(filename, file, save=False)
a.save()
if file.size < getattr(settings, 'MAX_EMAIL_ATTACHMENT_SIZE', 512000):
# Only files smaller than 512kb (or as defined in
# settings.MAX_EMAIL_ATTACHMENT_SIZE) are sent via email.
files.append([a.filename, a.file])
files = process_attachments(f, request.FILES.getlist('attachment'))
if title != ticket.title:
c = TicketChange(