From 5acd891c68400523954b209d3164daccb33fc904 Mon Sep 17 00:00:00 2001 From: Jonathan Barratt Date: Thu, 10 Nov 2016 23:23:16 +0700 Subject: [PATCH] refactor all handling of attached files Extract attachment processing from forms, views.staff, and management.command.get_email modules, and consolidate it into a unified lib module function. Also refactor the affected components, most notably lib.send_templated_email, to make it easier (IMO) to reason about changes to them. Add unit tests for attachments with UTF-8 filenames, and functional tests for submission of same, as well as ASCII versions, through the public ticket-form. Remove unused Attachment method "get_upload_to". --- helpdesk/forms.py | 28 +--- helpdesk/lib.py | 101 ++++++------ helpdesk/management/commands/get_email.py | 183 ++++++++-------------- helpdesk/models.py | 10 -- helpdesk/tests/test_attachments.py | 135 ++++++++++++++++ helpdesk/tests/test_get_email.py | 7 +- helpdesk/views/staff.py | 22 +-- 7 files changed, 265 insertions(+), 221 deletions(-) create mode 100644 helpdesk/tests/test_attachments.py diff --git a/helpdesk/forms.py b/helpdesk/forms.py index 7222a2cd..013ab270 100644 --- a/helpdesk/forms.py +++ b/helpdesk/forms.py @@ -17,7 +17,7 @@ from django.utils.translation import ugettext_lazy as _ from django.contrib.auth import get_user_model from django.utils import timezone -from helpdesk.lib import send_templated_mail, safe_template_context +from helpdesk.lib import send_templated_mail, safe_template_context, process_attachments from helpdesk.models import (Ticket, Queue, FollowUp, Attachment, IgnoreEmail, TicketCC, CustomField, TicketCustomFieldValue, TicketDependency) from helpdesk import settings as helpdesk_settings @@ -228,28 +228,10 @@ class AbstractTicketForm(CustomFieldMixin, forms.Form): return followup def _attach_files_to_follow_up(self, followup): - attachments = [] - if self.cleaned_data['attachment']: - import mimetypes - attachment = self.cleaned_data['attachment'] - filename = attachment.name.replace(' ', '_') - att = Attachment( - followup=followup, - filename=filename, - mime_type=mimetypes.guess_type(filename)[0] or 'application/octet-stream', - size=attachment.size, - ) - att.file.save(attachment.name, attachment, save=False) - att.save() - - if attachment.size < getattr(settings, 'MAX_EMAIL_ATTACHMENT_SIZE', 512000): - # Only files smaller than 512kb (or as defined in - # settings.MAX_EMAIL_ATTACHMENT_SIZE) are sent via email. - try: - attachments.append([att.filename, att.file]) - except NotImplementedError: - pass - return attachments + files = self.cleaned_data['attachment'] + if files: + files = process_attachments(followup, [files]) + return files @staticmethod def _send_messages(ticket, queue, followup, files, user=None): diff --git a/helpdesk/lib.py b/helpdesk/lib.py index 3bf24ff9..73d309c0 100644 --- a/helpdesk/lib.py +++ b/helpdesk/lib.py @@ -7,6 +7,8 @@ lib.py - Common functions (eg multipart e-mail) """ import logging +import mimetypes +import os try: from base64 import urlsafe_b64encode as b64encode @@ -17,22 +19,25 @@ try: except ImportError: from base64 import decodestring as b64decode -from django.utils.encoding import smart_str +from django.conf import settings from django.db.models import Q +from django.utils.encoding import smart_text from django.utils.safestring import mark_safe +from helpdesk.models import Attachment, EmailTemplate + logger = logging.getLogger('helpdesk') def send_templated_mail(template_name, - email_context, + context, recipients, sender=None, bcc=None, fail_silently=False, files=None): """ - send_templated_mail() is a warpper around Django's e-mail routines that + send_templated_mail() is a wrapper around Django's e-mail routines that allows us to easily send multipart (text/plain & text/html) e-mails using templates that are stored in the database. This lets the admin provide both a text and a HTML template for each message. @@ -40,7 +45,7 @@ def send_templated_mail(template_name, template_name is the slug of the template to use for this message (see models.EmailTemplate) - email_context is a dictionary to be used when rendering the template + context is a dictionary to be used when rendering the template recipients can be either a string, eg 'a@b.com', or a list of strings. @@ -53,68 +58,52 @@ def send_templated_mail(template_name, fail_silently is passed to Django's mail routine. Set to 'True' to ignore any errors at send time. - files can be a list of tuple. Each tuple should be a filename to attach, + files can be a list of tuples. Each tuple should be a filename to attach, along with the File objects to be read. files can be blank. """ - from django.conf import settings from django.core.mail import EmailMultiAlternatives + from django.template import engines + from_string = engines['django'].from_string from helpdesk.models import EmailTemplate from helpdesk.settings import HELPDESK_EMAIL_SUBJECT_TEMPLATE, \ HELPDESK_EMAIL_FALLBACK_LOCALE - import os - context = email_context + locale = context['queue'].get('locale') or HELPDESK_EMAIL_FALLBACK_LOCALE - if hasattr(context['queue'], 'locale'): - locale = getattr(context['queue'], 'locale', '') - else: - locale = context['queue'].get('locale', HELPDESK_EMAIL_FALLBACK_LOCALE) - if not locale: - locale = HELPDESK_EMAIL_FALLBACK_LOCALE - - t = None try: t = EmailTemplate.objects.get(template_name__iexact=template_name, locale=locale) except EmailTemplate.DoesNotExist: - pass - - if not t: try: t = EmailTemplate.objects.get(template_name__iexact=template_name, locale__isnull=True) except EmailTemplate.DoesNotExist: - logger.warning('template "%s" does not exist, no mail sent', - template_name) + logger.warning('template "%s" does not exist, no mail sent', template_name) return # just ignore if template doesn't exist - if not sender: - sender = settings.DEFAULT_FROM_EMAIL + subject_part = from_string( + HELPDESK_EMAIL_SUBJECT_TEMPLATE % {"subject": t.subject} + ).render(context).replace('\n', '').replace('\r', '') footer_file = os.path.join('helpdesk', locale, 'email_text_footer.txt') - from django.template import engines - template_func = engines['django'].from_string - - text_part = template_func( + text_part = from_string( "%s{%% include '%s' %%}" % (t.plain_text, footer_file) ).render(context) email_html_base_file = os.path.join('helpdesk', locale, 'email_html_base.html') - # keep new lines in html emails if 'comment' in context: - html_txt = context['comment'] - html_txt = html_txt.replace('\r\n', '
') - context['comment'] = mark_safe(html_txt) + context['comment'] = mark_safe(context['comment'].replace('\r\n', '
')) - html_part = template_func( + html_part = from_string( "{%% extends '%s' %%}{%% block title %%}" "%s" "{%% endblock %%}{%% block content %%}%s{%% endblock %%}" % - (email_html_base_file, t.heading, t.html)).render(context) + (email_html_base_file, t.heading, t.html) + ).render(context) - subject_part = template_func( + subject_part = from_string( HELPDESK_EMAIL_SUBJECT_TEMPLATE % { "subject": t.subject, }).render(context) @@ -123,19 +112,16 @@ def send_templated_mail(template_name, if recipients.find(','): recipients = recipients.split(',') elif type(recipients) != list: - recipients = [recipients, ] + recipients = [recipients] - msg = EmailMultiAlternatives( - subject_part.replace('\n', '').replace('\r', ''), - text_part, sender, recipients, bcc=bcc) + msg = EmailMultiAlternatives(subject_part, text_part, + sender or settings.DEFAULT_FROM_EMAIL, + recipients, bcc=bcc) msg.attach_alternative(html_part, "text/html") if files: - for attachment in files: - file_to_attach = attachment[1] - file_to_attach.open() - msg.attach(filename=attachment[0], content=file_to_attach.read()) - file_to_attach.close() + for filename, filefield in files: + msg.attach(filename, open(filefield.path).read()) return msg.send(fail_silently) @@ -254,7 +240,6 @@ def text_is_spam(text, request): # False if it is not spam. If it cannot be checked for some reason, we # assume it isn't spam. from django.contrib.sites.models import Site - from django.conf import settings try: from helpdesk.akismet import Akismet except: @@ -286,6 +271,32 @@ def text_is_spam(text, request): 'comment_author': '', } - return ak.comment_check(smart_str(text), data=ak_data) + return ak.comment_check(smart_text(text), data=ak_data) return False + + +def process_attachments(followup, attached_files): + max_email_attachment_size = getattr(settings, 'MAX_EMAIL_ATTACHMENT_SIZE', 512000) + attachments = [] + + for attached in attached_files: + if attached.size: + filename = smart_text(attached.name) + att = Attachment( + followup=followup, + file=attached, + filename=filename, + mime_type=attached.content_type or + mimetypes.guess_type(filename, strict=False)[0] or + 'application/octet-stream', + size=attached.size, + ) + att.save() + + if attached.size < max_email_attachment_size: + # Only files smaller than 512kb (or as defined in + # settings.MAX_EMAIL_ATTACHMENT_SIZE) are sent via email. + attachments.append([filename, att.file]) + + return attachments diff --git a/helpdesk/management/commands/get_email.py b/helpdesk/management/commands/get_email.py index e5b40c41..0bf4c0a3 100644 --- a/helpdesk/management/commands/get_email.py +++ b/helpdesk/management/commands/get_email.py @@ -11,34 +11,31 @@ scripts/get_email.py - Designed to be run from cron, this script checks the adding to existing tickets if needed) """ +from datetime import timedelta import email import imaplib import mimetypes +from os import listdir, unlink +from os.path import isfile, join import poplib import re import socket -from os import listdir, unlink -from os.path import isfile, join - -from datetime import timedelta -from email.header import decode_header -from email.utils import parseaddr, collapse_rfc2231_value -from optparse import make_option +from time import ctime from email_reply_parser import EmailReplyParser from django.core.files.base import ContentFile +from django.core.files.uploadedfile import SimpleUploadedFile from django.core.management.base import BaseCommand from django.db.models import Q from django.utils.translation import ugettext as _ -from django.utils import six, timezone +from django.utils import encoding, six, timezone from helpdesk import settings -from helpdesk.lib import send_templated_mail, safe_template_context -from helpdesk.models import Queue, Ticket, FollowUp, Attachment, IgnoreEmail +from helpdesk.lib import send_templated_mail, safe_template_context, process_attachments +from helpdesk.models import Queue, Ticket, FollowUp, IgnoreEmail import logging -from time import ctime STRIPPED_SUBJECT_STRINGS = [ @@ -93,24 +90,18 @@ def process_email(quiet=False): if quiet: logger.propagate = False # do not propagate to root logger that would log to console logdir = q.logging_dir or '/var/log/helpdesk/' - handler = logging.FileHandler(logdir + q.slug + '_get_email.log') + handler = logging.FileHandler(join(logdir, q.slug + '_get_email.log')) logger.addHandler(handler) if not q.email_box_last_check: q.email_box_last_check = timezone.now() - timedelta(minutes=30) - if not q.email_box_interval: - q.email_box_interval = 0 + queue_time_delta = timedelta(minutes=q.email_box_interval or 0) - queue_time_delta = timedelta(minutes=q.email_box_interval) - - if (q.email_box_last_check + queue_time_delta) > timezone.now(): - continue - - process_queue(q, logger=logger) - - q.email_box_last_check = timezone.now() - q.save() + if (q.email_box_last_check + queue_time_delta) < timezone.now(): + process_queue(q, logger=logger) + q.email_box_last_check = timezone.now() + q.save() def process_queue(q, logger): @@ -165,20 +156,20 @@ def process_queue(q, logger): server.pass_(q.email_box_pass or settings.QUEUE_EMAIL_BOX_PASSWORD) messagesInfo = server.list()[1] - logger.info("Received %s messages from POP3 server" % str(len(messagesInfo))) + logger.info("Received %d messages from POP3 server" % len(messagesInfo)) for msg in messagesInfo: msgNum = msg.split(" ")[0] - logger.info("Processing message %s" % str(msgNum)) + logger.info("Processing message %s" % msgNum) full_message = "\n".join(server.retr(msgNum)[1]) ticket = ticket_from_message(message=full_message, queue=q, logger=logger) if ticket: server.dele(msgNum) - logger.info("Successfully processed message %s, deleted from POP3 server" % str(msgNum)) + logger.info("Successfully processed message %s, deleted from POP3 server" % msgNum) else: - logger.warn("Message %s was not successfully processed, and will be left on POP3 server" % str(msgNum)) + logger.warn("Message %s was not successfully processed, and will be left on POP3 server" % msgNum) server.quit() @@ -207,16 +198,16 @@ def process_queue(q, logger): status, data = server.search(None, 'NOT', 'DELETED') if data: msgnums = data[0].split() - logger.info("Received %s messages from IMAP server" % str(len(msgnums))) + logger.info("Received %d messages from IMAP server" % len(msgnums)) for num in msgnums: - logger.info("Processing message %s" % str(num)) + logger.info("Processing message %s" % num) status, data = server.fetch(num, '(RFC822)') - ticket = ticket_from_message(message=data[0][1], queue=q, logger=logger) + ticket = ticket_from_message(message=encoding.smart_text(data[0][1]), queue=q, logger=logger) if ticket: server.store(num, '+FLAGS', '\\Deleted') - logger.info("Successfully processed message %s, deleted from IMAP server" % str(msgNum)) + logger.info("Successfully processed message %s, deleted from IMAP server" % num) else: - logger.warn("Message %s was not successfully processed, and will be left on IMAP server" % str(msgNum)) + logger.warn("Message %s was not successfully processed, and will be left on IMAP server" % num) server.expunge() server.close() @@ -225,20 +216,23 @@ def process_queue(q, logger): elif email_box_type == 'local': mail_dir = q.email_box_local_dir or '/var/lib/mail/helpdesk/' mail = [join(mail_dir, f) for f in listdir(mail_dir) if isfile(join(mail_dir, f))] - logger.info("Found %s messages in local mailbox directory" % str(len(mail))) - for m in mail: - logger.info("Processing message %s" % str(m)) + logger.info("Found %d messages in local mailbox directory" % len(mail)) + + logger.info("Found %d messages in local mailbox directory" % len(mail)) + for i, m in enumerate(mail, 1): + logger.info("Processing message %d" % i) with open(m, 'r') as f: ticket = ticket_from_message(message=f.read(), queue=q, logger=logger) - if ticket: - logger.info("Successfully processed message %s, ticket/comment created." % str(m)) - try: - unlink(m) # delete message file if ticket was successful - logger.info("Successfully deleted message %s." % str(m)) - except: - logger.error("Unable to delete message %s." % str(m)) + if ticket: + logger.info("Successfully processed message %d, ticket/comment created." % i) + try: + unlink(m) # delete message file if ticket was successful + except: + logger.error("Unable to delete message %d." % i) else: - logger.warn("Message %s was not successfully processed, and will be left in local directory" % str(m)) + logger.info("Successfully deleted message %d." % i) + else: + logger.warn("Message %d was not successfully processed, and will be left in local directory" % i) def decodeUnknown(charset, string): @@ -256,12 +250,12 @@ def decodeUnknown(charset, string): return str(string, encoding='utf-8', errors='replace') except: return str(string, encoding='iso8859-1', errors='replace') - return str(string, encoding=charset) + return str(string, encoding=charset, errors='replace') return string def decode_mail_headers(string): - decoded = decode_header(string) + decoded = email.header.decode_header(string) if six.PY2: return u' '.join([unicode(msg, charset or 'utf-8') for msg, charset in decoded]) elif six.PY3: @@ -270,8 +264,7 @@ def decode_mail_headers(string): def ticket_from_message(message, queue, logger): # 'message' must be an RFC822 formatted message. - msg = message - message = email.message_from_string(msg) + message = email.message_from_string(message) subject = message.get('subject', _('Created from e-mail')) subject = decode_mail_headers(decodeUnknown(message.get_charset(), subject)) for affix in STRIPPED_SUBJECT_STRINGS: @@ -280,10 +273,7 @@ def ticket_from_message(message, queue, logger): sender = message.get('from', _('Unknown Sender')) sender = decode_mail_headers(decodeUnknown(message.get_charset(), sender)) - - sender_email = parseaddr(sender)[1] - - body_plain, body_html = '', '' + sender_email = email.utils.parseaddr(sender)[1] for ignore in IgnoreEmail.objects.filter(Q(queues=queue) | Q(queues__isnull=True)): if ignore.test(sender_email): @@ -302,6 +292,7 @@ def ticket_from_message(message, queue, logger): logger.info("No tracking ID matched.") ticket = None + body = None counter = 0 files = [] @@ -311,80 +302,61 @@ def ticket_from_message(message, queue, logger): name = part.get_param("name") if name: - name = collapse_rfc2231_value(name) + name = email.utils.collapse_rfc2231_value(name) if part.get_content_maintype() == 'text' and name is None: if part.get_content_subtype() == 'plain': - body_plain = EmailReplyParser.parse_reply( - decodeUnknown(part.get_content_charset(), part.get_payload(decode=True))) + body = EmailReplyParser.parse_reply( + decodeUnknown(part.get_content_charset(), part.get_payload(decode=True)) + ) logger.debug("Discovered plain text MIME part") else: - body_html = part.get_payload(decode=True) + files.append( + SimpleUploadedFile(_("email_html_body.html"), encoding.smart_bytes(part.get_payload()), 'text/html') + ) logger.debug("Discovered HTML MIME part") else: if not name: ext = mimetypes.guess_extension(part.get_content_type()) name = "part-%i%s" % (counter, ext) - - files.append({ - 'filename': name, - 'content': part.get_payload(decode=True), - 'type': part.get_content_type()}, - ) + files.append(SimpleUploadedFile(name, encoding.smart_bytes(part.get_payload()), part.get_content_type())) logger.debug("Found MIME attachment %s" % name) counter += 1 - if body_plain: - body = body_plain - else: + if not body: body = _('No plain-text email body available. Please see attachment "email_html_body.html".') - if body_html: - files.append({ - 'filename': _("email_html_body.html"), - 'content': body_html, - 'type': 'text/html', - }) - - now = timezone.now() - if ticket: try: t = Ticket.objects.get(id=ticket) - new = False - logger.info("Found existing ticket with Tracking ID %s-%s" % (t.queue.slug, t.id)) except Ticket.DoesNotExist: logger.info("Tracking ID %s-%s not associated with existing ticket. Creating new ticket." % (queue.slug, ticket)) ticket = None - - priority = 3 + else: + logger.info("Found existing ticket with Tracking ID %s-%s" % (t.queue.slug, t.id)) + if t.status == Ticket.CLOSED_STATUS: + t.status = Ticket.REOPENED_STATUS + t.save() + new = False smtp_priority = message.get('priority', '') smtp_importance = message.get('importance', '') - - high_priority_types = ('high', 'important', '1', 'urgent') - - if smtp_priority in high_priority_types or smtp_importance in high_priority_types: - priority = 2 + high_priority_types = {'high', 'important', '1', 'urgent'} + priority = 2 if high_priority_types & {smtp_priority, smtp_importance} else 3 if ticket is None: - t = Ticket( + new = True + t = Ticket.objects.create( title=subject, queue=queue, submitter_email=sender_email, - created=now, + created=timezone.now(), description=body, priority=priority, ) - t.save() - new = True logger.debug("Created new ticket %s-%s" % (t.queue.slug, t.id)) - elif t.status == Ticket.CLOSED_STATUS: - t.status = Ticket.REOPENED_STATUS - t.save() - f = FollowUp( ticket=t, title=_('E-Mail Received from %(sender_email)s' % {'sender_email': sender_email}), @@ -405,28 +377,13 @@ def ticket_from_message(message, queue, logger): elif six.PY3: logger.info("[%s-%s] %s" % (t.queue.slug, t.id, t.title,)) - for file in files: - if file['content']: - if six.PY2: - filename = file['filename'].encode('ascii', 'replace').replace(' ', '_') - elif six.PY3: - filename = file['filename'].replace(' ', '_') - filename = re.sub('[^a-zA-Z0-9._-]+', '', filename) - logger.info("Found attachment '%s'" % filename) - a = Attachment( - followup=f, - filename=filename, - mime_type=file['type'], - size=len(file['content']), - ) - a.file.save(filename, ContentFile(file['content']), save=False) - a.save() - logger.info("Attachment '%s' successfully added to ticket." % filename) + attached = process_attachments(f, files) + for att_file in attached: + logger.info("Attachment '%s' successfully added to ticket from email." % att_file[0]) context = safe_template_context(t) if new: - if sender_email: send_templated_mail( 'newticket_submitter', @@ -435,7 +392,6 @@ def ticket_from_message(message, queue, logger): sender=queue.from_address, fail_silently=True, ) - if queue.new_ticket_cc: send_templated_mail( 'newticket_cc', @@ -444,7 +400,6 @@ def ticket_from_message(message, queue, logger): sender=queue.from_address, fail_silently=True, ) - if queue.updated_ticket_cc and queue.updated_ticket_cc != queue.new_ticket_cc: send_templated_mail( 'newticket_cc', @@ -453,15 +408,8 @@ def ticket_from_message(message, queue, logger): sender=queue.from_address, fail_silently=True, ) - else: context.update(comment=f.comment) - - # if t.status == Ticket.REOPENED_STATUS: - # update = _(' (Reopened)') - # else: - # update = _(' (Updated)') - if t.assigned_to: send_templated_mail( 'updated_owner', @@ -470,7 +418,6 @@ def ticket_from_message(message, queue, logger): sender=queue.from_address, fail_silently=True, ) - if queue.updated_ticket_cc: send_templated_mail( 'updated_cc', diff --git a/helpdesk/models.py b/helpdesk/models.py index 0c3bd93b..c39e733b 100644 --- a/helpdesk/models.py +++ b/helpdesk/models.py @@ -741,7 +741,6 @@ def attachment_path(instance, filename): putting attachments in a folder off attachments for ticket/followup_id/. """ import os - from django.conf import settings os.umask(0) path = 'helpdesk/attachments/%s/%s' % (instance.followup.ticket.ticket_for_url, instance.followup.id) att_path = os.path.join(settings.MEDIA_ROOT, path) @@ -784,15 +783,6 @@ class Attachment(models.Model): help_text=_('Size of this file in bytes'), ) - def get_upload_to(self, field_attname): - """ Get upload_to path specific to this item """ - if not self.id: - return u'' - return u'helpdesk/attachments/%s/%s' % ( - self.followup.ticket.ticket_for_url, - self.followup.id - ) - def __str__(self): return '%s' % self.filename diff --git a/helpdesk/tests/test_attachments.py b/helpdesk/tests/test_attachments.py new file mode 100644 index 00000000..dbc53d51 --- /dev/null +++ b/helpdesk/tests/test_attachments.py @@ -0,0 +1,135 @@ +# vim: set fileencoding=utf-8 : +from __future__ import unicode_literals + +import os +import shutil +from tempfile import gettempdir + +from django.core.files.uploadedfile import SimpleUploadedFile +from django.core.urlresolvers import reverse +from django.test import override_settings, TestCase +from django.utils.encoding import smart_text + +from helpdesk import lib, models + +try: + # Python >= 3.3 + from unittest import mock +except ImportError: + # Python < 3.3 + import mock + +MEDIA_DIR = os.path.join(gettempdir(), 'helpdesk_test_media') + + +@override_settings(MEDIA_ROOT=MEDIA_DIR) +class AttachmentIntegrationTests(TestCase): + + fixtures = ['emailtemplate.json'] + + def setUp(self): + self.queue_public = models.Queue.objects.create( + title='Public Queue', + slug='pub_q', + allow_public_submission=True, + new_ticket_cc='new.public@example.com', + updated_ticket_cc='update.public@example.com', + ) + + self.queue_private = models.Queue.objects.create( + title='Private Queue', + slug='priv_q', + allow_public_submission=False, + new_ticket_cc='new.private@example.com', + updated_ticket_cc='update.private@example.com', + ) + + self.ticket_data = { + 'title': 'Test Ticket Title', + 'body': 'Test Ticket Desc', + 'priority': 3, + 'submitter_email': 'submitter@example.com', + } + + def test_create_pub_ticket_with_attachment(self): + test_file = SimpleUploadedFile('test_att.txt', b'attached file content', 'text/plain') + post_data = self.ticket_data.copy() + post_data.update({ + 'queue': self.queue_public.id, + 'attachment': test_file, + }) + + # Ensure ticket form submits with attachment successfully + response = self.client.post(reverse('helpdesk:home'), post_data, follow=True) + self.assertContains(response, test_file.name) + + # Ensure attachment is available with correct content + att = models.Attachment.objects.get(followup__ticket=response.context['ticket']) + with open(os.path.join(MEDIA_DIR, att.file.name)) as file_on_disk: + disk_content = file_on_disk.read() + self.assertEqual(disk_content, 'attached file content') + + def test_create_pub_ticket_with_attachment_utf8(self): + test_file = SimpleUploadedFile('ß°äöü.txt', 'โจ'.encode('utf-8'), 'text/utf-8') + post_data = self.ticket_data.copy() + post_data.update({ + 'queue': self.queue_public.id, + 'attachment': test_file, + }) + + # Ensure ticket form submits with attachment successfully + response = self.client.post(reverse('helpdesk:home'), post_data, follow=True) + self.assertContains(response, test_file.name) + + # Ensure attachment is available with correct content + att = models.Attachment.objects.get(followup__ticket=response.context['ticket']) + with open(os.path.join(MEDIA_DIR, att.file.name)) as file_on_disk: + disk_content = smart_text(file_on_disk.read(), 'utf-8') + self.assertEqual(disk_content, 'โจ') + + +@mock.patch.object(models.FollowUp, 'save', autospec=True) +@mock.patch.object(models.Ticket, 'save', autospec=True) +@mock.patch.object(models.Queue, 'save', autospec=True) +class AttachmentUnitTests(TestCase): + + def setUp(self): + self.file_attrs = { + 'filename': '°ßäöü.txt', + 'content': 'โจ'.encode('utf-8'), + 'content-type': 'text/utf8', + } + self.test_file = SimpleUploadedFile.from_dict(self.file_attrs) + self.follow_up = models.FollowUp(ticket=models.Ticket(queue=models.Queue())) + + @mock.patch('helpdesk.lib.Attachment', autospec=True) + def test_unicode_attachment_filename(self, mock_att_save, mock_queue_save, mock_ticket_save, mock_follow_up_save): + """ check utf-8 data is parsed correcltly """ + filename, fileobj = lib.process_attachments(self.follow_up, [self.test_file])[0] + mock_att_save.assert_called_with( + file=self.test_file, + filename=self.file_attrs['filename'], + mime_type=self.file_attrs['content-type'], + size=len(self.file_attrs['content']), + followup=self.follow_up + ) + self.assertEqual(filename, self.file_attrs['filename']) + + @mock.patch.object(lib.Attachment, 'save', autospec=True) + @override_settings(MEDIA_ROOT=MEDIA_DIR) + def test_unicode_filename_to_filesystem(self, mock_att_save, mock_queue_save, mock_ticket_save, mock_follow_up_save): + """ don't mock saving to filesystem to test file renames caused by storage layer """ + filename, fileobj = lib.process_attachments(self.follow_up, [self.test_file])[0] + # Attachment object was zeroth positional arg (i.e. self) of att.save call + attachment_obj = mock_att_save.call_args[0][0] + + mock_att_save.assert_called_once_with(attachment_obj) + self.assertIsInstance(attachment_obj, models.Attachment) + self.assertEqual(attachment_obj.filename, self.file_attrs['filename']) + + +def tearDownModule(): + try: + shutil.rmtree(MEDIA_DIR) + except OSError: + pass diff --git a/helpdesk/tests/test_get_email.py b/helpdesk/tests/test_get_email.py index 5997e76f..3a296514 100644 --- a/helpdesk/tests/test_get_email.py +++ b/helpdesk/tests/test_get_email.py @@ -126,11 +126,7 @@ class GetEmailParametricTemplate(object): mocked_imaplib_server.fetch = mock.Mock(side_effect=lambda x, _: imap_emails[x]) with mock.patch('helpdesk.management.commands.get_email.imaplib', autospec=True) as mocked_imaplib: mocked_imaplib.IMAP4 = mock.Mock(return_value=mocked_imaplib_server) - try: - call_command('get_email') - except UnboundLocalError: - # known bug fixed by a subsequent commit - return True + call_command('get_email') ticket1 = get_object_or_404(Ticket, pk=1) self.assertEqual(ticket1.ticket_for_url, "QQ-%s" % ticket1.id) @@ -140,6 +136,7 @@ class GetEmailParametricTemplate(object): self.assertEqual(ticket2.ticket_for_url, "QQ-%s" % ticket2.id) self.assertEqual(ticket2.description, "This is the helpdesk comment via email.") + # build matrix of test cases case_methods = [c[0] for c in Queue._meta.get_field('email_box_type').choices] case_socks = [False] + [c[0] for c in Queue._meta.get_field('socks_proxy_type').choices] diff --git a/helpdesk/views/staff.py b/helpdesk/views/staff.py index 510efdb1..15b92dbc 100644 --- a/helpdesk/views/staff.py +++ b/helpdesk/views/staff.py @@ -31,6 +31,7 @@ from helpdesk.forms import ( ) from helpdesk.lib import ( send_templated_mail, query_to_dict, apply_query, safe_template_context, + process_attachments, ) from helpdesk.models import ( Ticket, Queue, FollowUp, TicketChange, PreSetReply, Attachment, SavedSearch, @@ -458,26 +459,7 @@ def update_ticket(request, ticket_id, public=False): f.save() - files = [] - if request.FILES: - import mimetypes - for file in request.FILES.getlist('attachment'): - filename = file.name.encode('ascii', 'ignore') - filename = filename.decode("utf-8") - print(filename) - a = Attachment( - followup=f, - filename=filename, - mime_type=file.content_type or 'application/octet-stream', - size=file.size, - ) - a.file.save(filename, file, save=False) - a.save() - - if file.size < getattr(settings, 'MAX_EMAIL_ATTACHMENT_SIZE', 512000): - # Only files smaller than 512kb (or as defined in - # settings.MAX_EMAIL_ATTACHMENT_SIZE) are sent via email. - files.append([a.filename, a.file]) + files = process_attachments(f, request.FILES.getlist('attachment')) if title != ticket.title: c = TicketChange(