diff --git a/helpdesk/forms.py b/helpdesk/forms.py
index 7222a2cd..013ab270 100644
--- a/helpdesk/forms.py
+++ b/helpdesk/forms.py
@@ -17,7 +17,7 @@ from django.utils.translation import ugettext_lazy as _
from django.contrib.auth import get_user_model
from django.utils import timezone
-from helpdesk.lib import send_templated_mail, safe_template_context
+from helpdesk.lib import send_templated_mail, safe_template_context, process_attachments
from helpdesk.models import (Ticket, Queue, FollowUp, Attachment, IgnoreEmail, TicketCC,
CustomField, TicketCustomFieldValue, TicketDependency)
from helpdesk import settings as helpdesk_settings
@@ -228,28 +228,10 @@ class AbstractTicketForm(CustomFieldMixin, forms.Form):
return followup
def _attach_files_to_follow_up(self, followup):
- attachments = []
- if self.cleaned_data['attachment']:
- import mimetypes
- attachment = self.cleaned_data['attachment']
- filename = attachment.name.replace(' ', '_')
- att = Attachment(
- followup=followup,
- filename=filename,
- mime_type=mimetypes.guess_type(filename)[0] or 'application/octet-stream',
- size=attachment.size,
- )
- att.file.save(attachment.name, attachment, save=False)
- att.save()
-
- if attachment.size < getattr(settings, 'MAX_EMAIL_ATTACHMENT_SIZE', 512000):
- # Only files smaller than 512kb (or as defined in
- # settings.MAX_EMAIL_ATTACHMENT_SIZE) are sent via email.
- try:
- attachments.append([att.filename, att.file])
- except NotImplementedError:
- pass
- return attachments
+ files = self.cleaned_data['attachment']
+ if files:
+ files = process_attachments(followup, [files])
+ return files
@staticmethod
def _send_messages(ticket, queue, followup, files, user=None):
diff --git a/helpdesk/lib.py b/helpdesk/lib.py
index 3bf24ff9..73d309c0 100644
--- a/helpdesk/lib.py
+++ b/helpdesk/lib.py
@@ -7,6 +7,8 @@ lib.py - Common functions (eg multipart e-mail)
"""
import logging
+import mimetypes
+import os
try:
from base64 import urlsafe_b64encode as b64encode
@@ -17,22 +19,25 @@ try:
except ImportError:
from base64 import decodestring as b64decode
-from django.utils.encoding import smart_str
+from django.conf import settings
from django.db.models import Q
+from django.utils.encoding import smart_text
from django.utils.safestring import mark_safe
+from helpdesk.models import Attachment, EmailTemplate
+
logger = logging.getLogger('helpdesk')
def send_templated_mail(template_name,
- email_context,
+ context,
recipients,
sender=None,
bcc=None,
fail_silently=False,
files=None):
"""
- send_templated_mail() is a warpper around Django's e-mail routines that
+ send_templated_mail() is a wrapper around Django's e-mail routines that
allows us to easily send multipart (text/plain & text/html) e-mails using
templates that are stored in the database. This lets the admin provide
both a text and a HTML template for each message.
@@ -40,7 +45,7 @@ def send_templated_mail(template_name,
template_name is the slug of the template to use for this message (see
models.EmailTemplate)
- email_context is a dictionary to be used when rendering the template
+ context is a dictionary to be used when rendering the template
recipients can be either a string, eg 'a@b.com', or a list of strings.
@@ -53,68 +58,52 @@ def send_templated_mail(template_name,
fail_silently is passed to Django's mail routine. Set to 'True' to ignore
any errors at send time.
- files can be a list of tuple. Each tuple should be a filename to attach,
+ files can be a list of tuples. Each tuple should be a filename to attach,
along with the File objects to be read. files can be blank.
"""
- from django.conf import settings
from django.core.mail import EmailMultiAlternatives
+ from django.template import engines
+ from_string = engines['django'].from_string
from helpdesk.models import EmailTemplate
from helpdesk.settings import HELPDESK_EMAIL_SUBJECT_TEMPLATE, \
HELPDESK_EMAIL_FALLBACK_LOCALE
- import os
- context = email_context
+ locale = context['queue'].get('locale') or HELPDESK_EMAIL_FALLBACK_LOCALE
- if hasattr(context['queue'], 'locale'):
- locale = getattr(context['queue'], 'locale', '')
- else:
- locale = context['queue'].get('locale', HELPDESK_EMAIL_FALLBACK_LOCALE)
- if not locale:
- locale = HELPDESK_EMAIL_FALLBACK_LOCALE
-
- t = None
try:
t = EmailTemplate.objects.get(template_name__iexact=template_name, locale=locale)
except EmailTemplate.DoesNotExist:
- pass
-
- if not t:
try:
t = EmailTemplate.objects.get(template_name__iexact=template_name, locale__isnull=True)
except EmailTemplate.DoesNotExist:
- logger.warning('template "%s" does not exist, no mail sent',
- template_name)
+ logger.warning('template "%s" does not exist, no mail sent', template_name)
return # just ignore if template doesn't exist
- if not sender:
- sender = settings.DEFAULT_FROM_EMAIL
+ subject_part = from_string(
+ HELPDESK_EMAIL_SUBJECT_TEMPLATE % {"subject": t.subject}
+ ).render(context).replace('\n', '').replace('\r', '')
footer_file = os.path.join('helpdesk', locale, 'email_text_footer.txt')
- from django.template import engines
- template_func = engines['django'].from_string
-
- text_part = template_func(
+ text_part = from_string(
"%s{%% include '%s' %%}" % (t.plain_text, footer_file)
).render(context)
email_html_base_file = os.path.join('helpdesk', locale, 'email_html_base.html')
-
# keep new lines in html emails
if 'comment' in context:
- html_txt = context['comment']
- html_txt = html_txt.replace('\r\n', '
')
- context['comment'] = mark_safe(html_txt)
+ context['comment'] = mark_safe(context['comment'].replace('\r\n', '
'))
- html_part = template_func(
+ html_part = from_string(
"{%% extends '%s' %%}{%% block title %%}"
"%s"
"{%% endblock %%}{%% block content %%}%s{%% endblock %%}" %
- (email_html_base_file, t.heading, t.html)).render(context)
+ (email_html_base_file, t.heading, t.html)
+ ).render(context)
- subject_part = template_func(
+ subject_part = from_string(
HELPDESK_EMAIL_SUBJECT_TEMPLATE % {
"subject": t.subject,
}).render(context)
@@ -123,19 +112,16 @@ def send_templated_mail(template_name,
if recipients.find(','):
recipients = recipients.split(',')
elif type(recipients) != list:
- recipients = [recipients, ]
+ recipients = [recipients]
- msg = EmailMultiAlternatives(
- subject_part.replace('\n', '').replace('\r', ''),
- text_part, sender, recipients, bcc=bcc)
+ msg = EmailMultiAlternatives(subject_part, text_part,
+ sender or settings.DEFAULT_FROM_EMAIL,
+ recipients, bcc=bcc)
msg.attach_alternative(html_part, "text/html")
if files:
- for attachment in files:
- file_to_attach = attachment[1]
- file_to_attach.open()
- msg.attach(filename=attachment[0], content=file_to_attach.read())
- file_to_attach.close()
+ for filename, filefield in files:
+ msg.attach(filename, open(filefield.path).read())
return msg.send(fail_silently)
@@ -254,7 +240,6 @@ def text_is_spam(text, request):
# False if it is not spam. If it cannot be checked for some reason, we
# assume it isn't spam.
from django.contrib.sites.models import Site
- from django.conf import settings
try:
from helpdesk.akismet import Akismet
except:
@@ -286,6 +271,32 @@ def text_is_spam(text, request):
'comment_author': '',
}
- return ak.comment_check(smart_str(text), data=ak_data)
+ return ak.comment_check(smart_text(text), data=ak_data)
return False
+
+
+def process_attachments(followup, attached_files):
+ max_email_attachment_size = getattr(settings, 'MAX_EMAIL_ATTACHMENT_SIZE', 512000)
+ attachments = []
+
+ for attached in attached_files:
+ if attached.size:
+ filename = smart_text(attached.name)
+ att = Attachment(
+ followup=followup,
+ file=attached,
+ filename=filename,
+ mime_type=attached.content_type or
+ mimetypes.guess_type(filename, strict=False)[0] or
+ 'application/octet-stream',
+ size=attached.size,
+ )
+ att.save()
+
+ if attached.size < max_email_attachment_size:
+ # Only files smaller than 512kb (or as defined in
+ # settings.MAX_EMAIL_ATTACHMENT_SIZE) are sent via email.
+ attachments.append([filename, att.file])
+
+ return attachments
diff --git a/helpdesk/management/commands/get_email.py b/helpdesk/management/commands/get_email.py
index e5b40c41..0bf4c0a3 100644
--- a/helpdesk/management/commands/get_email.py
+++ b/helpdesk/management/commands/get_email.py
@@ -11,34 +11,31 @@ scripts/get_email.py - Designed to be run from cron, this script checks the
adding to existing tickets if needed)
"""
+from datetime import timedelta
import email
import imaplib
import mimetypes
+from os import listdir, unlink
+from os.path import isfile, join
import poplib
import re
import socket
-from os import listdir, unlink
-from os.path import isfile, join
-
-from datetime import timedelta
-from email.header import decode_header
-from email.utils import parseaddr, collapse_rfc2231_value
-from optparse import make_option
+from time import ctime
from email_reply_parser import EmailReplyParser
from django.core.files.base import ContentFile
+from django.core.files.uploadedfile import SimpleUploadedFile
from django.core.management.base import BaseCommand
from django.db.models import Q
from django.utils.translation import ugettext as _
-from django.utils import six, timezone
+from django.utils import encoding, six, timezone
from helpdesk import settings
-from helpdesk.lib import send_templated_mail, safe_template_context
-from helpdesk.models import Queue, Ticket, FollowUp, Attachment, IgnoreEmail
+from helpdesk.lib import send_templated_mail, safe_template_context, process_attachments
+from helpdesk.models import Queue, Ticket, FollowUp, IgnoreEmail
import logging
-from time import ctime
STRIPPED_SUBJECT_STRINGS = [
@@ -93,24 +90,18 @@ def process_email(quiet=False):
if quiet:
logger.propagate = False # do not propagate to root logger that would log to console
logdir = q.logging_dir or '/var/log/helpdesk/'
- handler = logging.FileHandler(logdir + q.slug + '_get_email.log')
+ handler = logging.FileHandler(join(logdir, q.slug + '_get_email.log'))
logger.addHandler(handler)
if not q.email_box_last_check:
q.email_box_last_check = timezone.now() - timedelta(minutes=30)
- if not q.email_box_interval:
- q.email_box_interval = 0
+ queue_time_delta = timedelta(minutes=q.email_box_interval or 0)
- queue_time_delta = timedelta(minutes=q.email_box_interval)
-
- if (q.email_box_last_check + queue_time_delta) > timezone.now():
- continue
-
- process_queue(q, logger=logger)
-
- q.email_box_last_check = timezone.now()
- q.save()
+ if (q.email_box_last_check + queue_time_delta) < timezone.now():
+ process_queue(q, logger=logger)
+ q.email_box_last_check = timezone.now()
+ q.save()
def process_queue(q, logger):
@@ -165,20 +156,20 @@ def process_queue(q, logger):
server.pass_(q.email_box_pass or settings.QUEUE_EMAIL_BOX_PASSWORD)
messagesInfo = server.list()[1]
- logger.info("Received %s messages from POP3 server" % str(len(messagesInfo)))
+ logger.info("Received %d messages from POP3 server" % len(messagesInfo))
for msg in messagesInfo:
msgNum = msg.split(" ")[0]
- logger.info("Processing message %s" % str(msgNum))
+ logger.info("Processing message %s" % msgNum)
full_message = "\n".join(server.retr(msgNum)[1])
ticket = ticket_from_message(message=full_message, queue=q, logger=logger)
if ticket:
server.dele(msgNum)
- logger.info("Successfully processed message %s, deleted from POP3 server" % str(msgNum))
+ logger.info("Successfully processed message %s, deleted from POP3 server" % msgNum)
else:
- logger.warn("Message %s was not successfully processed, and will be left on POP3 server" % str(msgNum))
+ logger.warn("Message %s was not successfully processed, and will be left on POP3 server" % msgNum)
server.quit()
@@ -207,16 +198,16 @@ def process_queue(q, logger):
status, data = server.search(None, 'NOT', 'DELETED')
if data:
msgnums = data[0].split()
- logger.info("Received %s messages from IMAP server" % str(len(msgnums)))
+ logger.info("Received %d messages from IMAP server" % len(msgnums))
for num in msgnums:
- logger.info("Processing message %s" % str(num))
+ logger.info("Processing message %s" % num)
status, data = server.fetch(num, '(RFC822)')
- ticket = ticket_from_message(message=data[0][1], queue=q, logger=logger)
+ ticket = ticket_from_message(message=encoding.smart_text(data[0][1]), queue=q, logger=logger)
if ticket:
server.store(num, '+FLAGS', '\\Deleted')
- logger.info("Successfully processed message %s, deleted from IMAP server" % str(msgNum))
+ logger.info("Successfully processed message %s, deleted from IMAP server" % num)
else:
- logger.warn("Message %s was not successfully processed, and will be left on IMAP server" % str(msgNum))
+ logger.warn("Message %s was not successfully processed, and will be left on IMAP server" % num)
server.expunge()
server.close()
@@ -225,20 +216,23 @@ def process_queue(q, logger):
elif email_box_type == 'local':
mail_dir = q.email_box_local_dir or '/var/lib/mail/helpdesk/'
mail = [join(mail_dir, f) for f in listdir(mail_dir) if isfile(join(mail_dir, f))]
- logger.info("Found %s messages in local mailbox directory" % str(len(mail)))
- for m in mail:
- logger.info("Processing message %s" % str(m))
+ logger.info("Found %d messages in local mailbox directory" % len(mail))
+
+ logger.info("Found %d messages in local mailbox directory" % len(mail))
+ for i, m in enumerate(mail, 1):
+ logger.info("Processing message %d" % i)
with open(m, 'r') as f:
ticket = ticket_from_message(message=f.read(), queue=q, logger=logger)
- if ticket:
- logger.info("Successfully processed message %s, ticket/comment created." % str(m))
- try:
- unlink(m) # delete message file if ticket was successful
- logger.info("Successfully deleted message %s." % str(m))
- except:
- logger.error("Unable to delete message %s." % str(m))
+ if ticket:
+ logger.info("Successfully processed message %d, ticket/comment created." % i)
+ try:
+ unlink(m) # delete message file if ticket was successful
+ except:
+ logger.error("Unable to delete message %d." % i)
else:
- logger.warn("Message %s was not successfully processed, and will be left in local directory" % str(m))
+ logger.info("Successfully deleted message %d." % i)
+ else:
+ logger.warn("Message %d was not successfully processed, and will be left in local directory" % i)
def decodeUnknown(charset, string):
@@ -256,12 +250,12 @@ def decodeUnknown(charset, string):
return str(string, encoding='utf-8', errors='replace')
except:
return str(string, encoding='iso8859-1', errors='replace')
- return str(string, encoding=charset)
+ return str(string, encoding=charset, errors='replace')
return string
def decode_mail_headers(string):
- decoded = decode_header(string)
+ decoded = email.header.decode_header(string)
if six.PY2:
return u' '.join([unicode(msg, charset or 'utf-8') for msg, charset in decoded])
elif six.PY3:
@@ -270,8 +264,7 @@ def decode_mail_headers(string):
def ticket_from_message(message, queue, logger):
# 'message' must be an RFC822 formatted message.
- msg = message
- message = email.message_from_string(msg)
+ message = email.message_from_string(message)
subject = message.get('subject', _('Created from e-mail'))
subject = decode_mail_headers(decodeUnknown(message.get_charset(), subject))
for affix in STRIPPED_SUBJECT_STRINGS:
@@ -280,10 +273,7 @@ def ticket_from_message(message, queue, logger):
sender = message.get('from', _('Unknown Sender'))
sender = decode_mail_headers(decodeUnknown(message.get_charset(), sender))
-
- sender_email = parseaddr(sender)[1]
-
- body_plain, body_html = '', ''
+ sender_email = email.utils.parseaddr(sender)[1]
for ignore in IgnoreEmail.objects.filter(Q(queues=queue) | Q(queues__isnull=True)):
if ignore.test(sender_email):
@@ -302,6 +292,7 @@ def ticket_from_message(message, queue, logger):
logger.info("No tracking ID matched.")
ticket = None
+ body = None
counter = 0
files = []
@@ -311,80 +302,61 @@ def ticket_from_message(message, queue, logger):
name = part.get_param("name")
if name:
- name = collapse_rfc2231_value(name)
+ name = email.utils.collapse_rfc2231_value(name)
if part.get_content_maintype() == 'text' and name is None:
if part.get_content_subtype() == 'plain':
- body_plain = EmailReplyParser.parse_reply(
- decodeUnknown(part.get_content_charset(), part.get_payload(decode=True)))
+ body = EmailReplyParser.parse_reply(
+ decodeUnknown(part.get_content_charset(), part.get_payload(decode=True))
+ )
logger.debug("Discovered plain text MIME part")
else:
- body_html = part.get_payload(decode=True)
+ files.append(
+ SimpleUploadedFile(_("email_html_body.html"), encoding.smart_bytes(part.get_payload()), 'text/html')
+ )
logger.debug("Discovered HTML MIME part")
else:
if not name:
ext = mimetypes.guess_extension(part.get_content_type())
name = "part-%i%s" % (counter, ext)
-
- files.append({
- 'filename': name,
- 'content': part.get_payload(decode=True),
- 'type': part.get_content_type()},
- )
+ files.append(SimpleUploadedFile(name, encoding.smart_bytes(part.get_payload()), part.get_content_type()))
logger.debug("Found MIME attachment %s" % name)
counter += 1
- if body_plain:
- body = body_plain
- else:
+ if not body:
body = _('No plain-text email body available. Please see attachment "email_html_body.html".')
- if body_html:
- files.append({
- 'filename': _("email_html_body.html"),
- 'content': body_html,
- 'type': 'text/html',
- })
-
- now = timezone.now()
-
if ticket:
try:
t = Ticket.objects.get(id=ticket)
- new = False
- logger.info("Found existing ticket with Tracking ID %s-%s" % (t.queue.slug, t.id))
except Ticket.DoesNotExist:
logger.info("Tracking ID %s-%s not associated with existing ticket. Creating new ticket." % (queue.slug, ticket))
ticket = None
-
- priority = 3
+ else:
+ logger.info("Found existing ticket with Tracking ID %s-%s" % (t.queue.slug, t.id))
+ if t.status == Ticket.CLOSED_STATUS:
+ t.status = Ticket.REOPENED_STATUS
+ t.save()
+ new = False
smtp_priority = message.get('priority', '')
smtp_importance = message.get('importance', '')
-
- high_priority_types = ('high', 'important', '1', 'urgent')
-
- if smtp_priority in high_priority_types or smtp_importance in high_priority_types:
- priority = 2
+ high_priority_types = {'high', 'important', '1', 'urgent'}
+ priority = 2 if high_priority_types & {smtp_priority, smtp_importance} else 3
if ticket is None:
- t = Ticket(
+ new = True
+ t = Ticket.objects.create(
title=subject,
queue=queue,
submitter_email=sender_email,
- created=now,
+ created=timezone.now(),
description=body,
priority=priority,
)
- t.save()
- new = True
logger.debug("Created new ticket %s-%s" % (t.queue.slug, t.id))
- elif t.status == Ticket.CLOSED_STATUS:
- t.status = Ticket.REOPENED_STATUS
- t.save()
-
f = FollowUp(
ticket=t,
title=_('E-Mail Received from %(sender_email)s' % {'sender_email': sender_email}),
@@ -405,28 +377,13 @@ def ticket_from_message(message, queue, logger):
elif six.PY3:
logger.info("[%s-%s] %s" % (t.queue.slug, t.id, t.title,))
- for file in files:
- if file['content']:
- if six.PY2:
- filename = file['filename'].encode('ascii', 'replace').replace(' ', '_')
- elif six.PY3:
- filename = file['filename'].replace(' ', '_')
- filename = re.sub('[^a-zA-Z0-9._-]+', '', filename)
- logger.info("Found attachment '%s'" % filename)
- a = Attachment(
- followup=f,
- filename=filename,
- mime_type=file['type'],
- size=len(file['content']),
- )
- a.file.save(filename, ContentFile(file['content']), save=False)
- a.save()
- logger.info("Attachment '%s' successfully added to ticket." % filename)
+ attached = process_attachments(f, files)
+ for att_file in attached:
+ logger.info("Attachment '%s' successfully added to ticket from email." % att_file[0])
context = safe_template_context(t)
if new:
-
if sender_email:
send_templated_mail(
'newticket_submitter',
@@ -435,7 +392,6 @@ def ticket_from_message(message, queue, logger):
sender=queue.from_address,
fail_silently=True,
)
-
if queue.new_ticket_cc:
send_templated_mail(
'newticket_cc',
@@ -444,7 +400,6 @@ def ticket_from_message(message, queue, logger):
sender=queue.from_address,
fail_silently=True,
)
-
if queue.updated_ticket_cc and queue.updated_ticket_cc != queue.new_ticket_cc:
send_templated_mail(
'newticket_cc',
@@ -453,15 +408,8 @@ def ticket_from_message(message, queue, logger):
sender=queue.from_address,
fail_silently=True,
)
-
else:
context.update(comment=f.comment)
-
- # if t.status == Ticket.REOPENED_STATUS:
- # update = _(' (Reopened)')
- # else:
- # update = _(' (Updated)')
-
if t.assigned_to:
send_templated_mail(
'updated_owner',
@@ -470,7 +418,6 @@ def ticket_from_message(message, queue, logger):
sender=queue.from_address,
fail_silently=True,
)
-
if queue.updated_ticket_cc:
send_templated_mail(
'updated_cc',
diff --git a/helpdesk/models.py b/helpdesk/models.py
index 0c3bd93b..c39e733b 100644
--- a/helpdesk/models.py
+++ b/helpdesk/models.py
@@ -741,7 +741,6 @@ def attachment_path(instance, filename):
putting attachments in a folder off attachments for ticket/followup_id/.
"""
import os
- from django.conf import settings
os.umask(0)
path = 'helpdesk/attachments/%s/%s' % (instance.followup.ticket.ticket_for_url, instance.followup.id)
att_path = os.path.join(settings.MEDIA_ROOT, path)
@@ -784,15 +783,6 @@ class Attachment(models.Model):
help_text=_('Size of this file in bytes'),
)
- def get_upload_to(self, field_attname):
- """ Get upload_to path specific to this item """
- if not self.id:
- return u''
- return u'helpdesk/attachments/%s/%s' % (
- self.followup.ticket.ticket_for_url,
- self.followup.id
- )
-
def __str__(self):
return '%s' % self.filename
diff --git a/helpdesk/tests/test_attachments.py b/helpdesk/tests/test_attachments.py
new file mode 100644
index 00000000..dbc53d51
--- /dev/null
+++ b/helpdesk/tests/test_attachments.py
@@ -0,0 +1,135 @@
+# vim: set fileencoding=utf-8 :
+from __future__ import unicode_literals
+
+import os
+import shutil
+from tempfile import gettempdir
+
+from django.core.files.uploadedfile import SimpleUploadedFile
+from django.core.urlresolvers import reverse
+from django.test import override_settings, TestCase
+from django.utils.encoding import smart_text
+
+from helpdesk import lib, models
+
+try:
+ # Python >= 3.3
+ from unittest import mock
+except ImportError:
+ # Python < 3.3
+ import mock
+
+MEDIA_DIR = os.path.join(gettempdir(), 'helpdesk_test_media')
+
+
+@override_settings(MEDIA_ROOT=MEDIA_DIR)
+class AttachmentIntegrationTests(TestCase):
+
+ fixtures = ['emailtemplate.json']
+
+ def setUp(self):
+ self.queue_public = models.Queue.objects.create(
+ title='Public Queue',
+ slug='pub_q',
+ allow_public_submission=True,
+ new_ticket_cc='new.public@example.com',
+ updated_ticket_cc='update.public@example.com',
+ )
+
+ self.queue_private = models.Queue.objects.create(
+ title='Private Queue',
+ slug='priv_q',
+ allow_public_submission=False,
+ new_ticket_cc='new.private@example.com',
+ updated_ticket_cc='update.private@example.com',
+ )
+
+ self.ticket_data = {
+ 'title': 'Test Ticket Title',
+ 'body': 'Test Ticket Desc',
+ 'priority': 3,
+ 'submitter_email': 'submitter@example.com',
+ }
+
+ def test_create_pub_ticket_with_attachment(self):
+ test_file = SimpleUploadedFile('test_att.txt', b'attached file content', 'text/plain')
+ post_data = self.ticket_data.copy()
+ post_data.update({
+ 'queue': self.queue_public.id,
+ 'attachment': test_file,
+ })
+
+ # Ensure ticket form submits with attachment successfully
+ response = self.client.post(reverse('helpdesk:home'), post_data, follow=True)
+ self.assertContains(response, test_file.name)
+
+ # Ensure attachment is available with correct content
+ att = models.Attachment.objects.get(followup__ticket=response.context['ticket'])
+ with open(os.path.join(MEDIA_DIR, att.file.name)) as file_on_disk:
+ disk_content = file_on_disk.read()
+ self.assertEqual(disk_content, 'attached file content')
+
+ def test_create_pub_ticket_with_attachment_utf8(self):
+ test_file = SimpleUploadedFile('ß°äöü.txt', 'โจ'.encode('utf-8'), 'text/utf-8')
+ post_data = self.ticket_data.copy()
+ post_data.update({
+ 'queue': self.queue_public.id,
+ 'attachment': test_file,
+ })
+
+ # Ensure ticket form submits with attachment successfully
+ response = self.client.post(reverse('helpdesk:home'), post_data, follow=True)
+ self.assertContains(response, test_file.name)
+
+ # Ensure attachment is available with correct content
+ att = models.Attachment.objects.get(followup__ticket=response.context['ticket'])
+ with open(os.path.join(MEDIA_DIR, att.file.name)) as file_on_disk:
+ disk_content = smart_text(file_on_disk.read(), 'utf-8')
+ self.assertEqual(disk_content, 'โจ')
+
+
+@mock.patch.object(models.FollowUp, 'save', autospec=True)
+@mock.patch.object(models.Ticket, 'save', autospec=True)
+@mock.patch.object(models.Queue, 'save', autospec=True)
+class AttachmentUnitTests(TestCase):
+
+ def setUp(self):
+ self.file_attrs = {
+ 'filename': '°ßäöü.txt',
+ 'content': 'โจ'.encode('utf-8'),
+ 'content-type': 'text/utf8',
+ }
+ self.test_file = SimpleUploadedFile.from_dict(self.file_attrs)
+ self.follow_up = models.FollowUp(ticket=models.Ticket(queue=models.Queue()))
+
+ @mock.patch('helpdesk.lib.Attachment', autospec=True)
+ def test_unicode_attachment_filename(self, mock_att_save, mock_queue_save, mock_ticket_save, mock_follow_up_save):
+ """ check utf-8 data is parsed correcltly """
+ filename, fileobj = lib.process_attachments(self.follow_up, [self.test_file])[0]
+ mock_att_save.assert_called_with(
+ file=self.test_file,
+ filename=self.file_attrs['filename'],
+ mime_type=self.file_attrs['content-type'],
+ size=len(self.file_attrs['content']),
+ followup=self.follow_up
+ )
+ self.assertEqual(filename, self.file_attrs['filename'])
+
+ @mock.patch.object(lib.Attachment, 'save', autospec=True)
+ @override_settings(MEDIA_ROOT=MEDIA_DIR)
+ def test_unicode_filename_to_filesystem(self, mock_att_save, mock_queue_save, mock_ticket_save, mock_follow_up_save):
+ """ don't mock saving to filesystem to test file renames caused by storage layer """
+ filename, fileobj = lib.process_attachments(self.follow_up, [self.test_file])[0]
+ # Attachment object was zeroth positional arg (i.e. self) of att.save call
+ attachment_obj = mock_att_save.call_args[0][0]
+
+ mock_att_save.assert_called_once_with(attachment_obj)
+ self.assertIsInstance(attachment_obj, models.Attachment)
+ self.assertEqual(attachment_obj.filename, self.file_attrs['filename'])
+
+
+def tearDownModule():
+ try:
+ shutil.rmtree(MEDIA_DIR)
+ except OSError:
+ pass
diff --git a/helpdesk/tests/test_get_email.py b/helpdesk/tests/test_get_email.py
index 5997e76f..3a296514 100644
--- a/helpdesk/tests/test_get_email.py
+++ b/helpdesk/tests/test_get_email.py
@@ -126,11 +126,7 @@ class GetEmailParametricTemplate(object):
mocked_imaplib_server.fetch = mock.Mock(side_effect=lambda x, _: imap_emails[x])
with mock.patch('helpdesk.management.commands.get_email.imaplib', autospec=True) as mocked_imaplib:
mocked_imaplib.IMAP4 = mock.Mock(return_value=mocked_imaplib_server)
- try:
- call_command('get_email')
- except UnboundLocalError:
- # known bug fixed by a subsequent commit
- return True
+ call_command('get_email')
ticket1 = get_object_or_404(Ticket, pk=1)
self.assertEqual(ticket1.ticket_for_url, "QQ-%s" % ticket1.id)
@@ -140,6 +136,7 @@ class GetEmailParametricTemplate(object):
self.assertEqual(ticket2.ticket_for_url, "QQ-%s" % ticket2.id)
self.assertEqual(ticket2.description, "This is the helpdesk comment via email.")
+
# build matrix of test cases
case_methods = [c[0] for c in Queue._meta.get_field('email_box_type').choices]
case_socks = [False] + [c[0] for c in Queue._meta.get_field('socks_proxy_type').choices]
diff --git a/helpdesk/views/staff.py b/helpdesk/views/staff.py
index 510efdb1..15b92dbc 100644
--- a/helpdesk/views/staff.py
+++ b/helpdesk/views/staff.py
@@ -31,6 +31,7 @@ from helpdesk.forms import (
)
from helpdesk.lib import (
send_templated_mail, query_to_dict, apply_query, safe_template_context,
+ process_attachments,
)
from helpdesk.models import (
Ticket, Queue, FollowUp, TicketChange, PreSetReply, Attachment, SavedSearch,
@@ -458,26 +459,7 @@ def update_ticket(request, ticket_id, public=False):
f.save()
- files = []
- if request.FILES:
- import mimetypes
- for file in request.FILES.getlist('attachment'):
- filename = file.name.encode('ascii', 'ignore')
- filename = filename.decode("utf-8")
- print(filename)
- a = Attachment(
- followup=f,
- filename=filename,
- mime_type=file.content_type or 'application/octet-stream',
- size=file.size,
- )
- a.file.save(filename, file, save=False)
- a.save()
-
- if file.size < getattr(settings, 'MAX_EMAIL_ATTACHMENT_SIZE', 512000):
- # Only files smaller than 512kb (or as defined in
- # settings.MAX_EMAIL_ATTACHMENT_SIZE) are sent via email.
- files.append([a.filename, a.file])
+ files = process_attachments(f, request.FILES.getlist('attachment'))
if title != ticket.title:
c = TicketChange(