Merge pull request #1041 from martin-marty/unstable

Refactoring to reduce complexity of several email-related functions
This commit is contained in:
Garret Wassermann 2022-07-25 01:47:40 -04:00 committed by GitHub
commit e75e977911
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 561 additions and 378 deletions

View File

@ -2,6 +2,7 @@
max-line-length = 120
exclude = .git,__pycache__,.tox,.eggs,*.egg,node_modules,.venv,migrations,docs,demo,tests,setup.py
import-order-style = pep8
max-complexity = 20
[pycodestyle]
max-line-length = 120

View File

@ -28,9 +28,8 @@ jobs:
run: |
pip install flake8
# stop the build if there are Python syntax errors or undefined names
flake8 helpdesk --count --show-source --statistics --exit-zero
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
# flake8 . --count --exit-zero --max-complexity=10 --statistics
flake8 helpdesk --count --show-source --statistics --exit-zero --max-complexity=20
- name: Sort style check with 'isort'
run: |
isort --line-length=120 --src helpdesk . --check

View File

@ -21,7 +21,7 @@ from email.utils import getaddresses
from email_reply_parser import EmailReplyParser
from helpdesk import settings
from helpdesk.lib import process_attachments, safe_template_context
from helpdesk.models import FollowUp, IgnoreEmail, Queue, Ticket, TicketCC
from helpdesk.models import FollowUp, IgnoreEmail, Queue, Ticket
import imaplib
import logging
import mimetypes
@ -33,6 +33,7 @@ import socket
import ssl
import sys
from time import ctime
import typing
# import User model, which may be a custom model
@ -176,13 +177,13 @@ def imap_sync(q, logger, server):
sys.exit()
try:
status, data = server.search(None, 'NOT', 'DELETED')
data = server.search(None, 'NOT', 'DELETED')[1]
if data:
msgnums = data[0].split()
logger.info("Received %d messages from IMAP server" % len(msgnums))
for num in msgnums:
logger.info("Processing message %s" % num)
status, data = server.fetch(num, '(RFC822)')
data = server.fetch(num, '(RFC822)')[1]
full_message = encoding.force_str(data[0][1], errors='replace')
try:
ticket = object_from_message(
@ -345,7 +346,7 @@ def create_ticket_cc(ticket, cc_list):
from helpdesk.views.staff import subscribe_to_ticket_updates, User
new_ticket_ccs = []
for cced_name, cced_email in cc_list:
for __, cced_email in cc_list:
cced_email = cced_email.strip()
if cced_email == ticket.queue.email_address:
@ -354,7 +355,7 @@ def create_ticket_cc(ticket, cc_list):
user = None
try:
user = User.objects.get(email=cced_email)
user = User.objects.get(email=cced_email) # @UndefinedVariable
except User.DoesNotExist:
pass
@ -466,16 +467,6 @@ def create_object_from_email_message(message, ticket_id, payload, files, logger)
new_ticket_ccs = []
new_ticket_ccs.append(create_ticket_cc(ticket, to_list + cc_list))
notifications_to_be_sent = [sender_email]
if queue.enable_notifications_on_email_events and len(notifications_to_be_sent):
ticket_cc_list = TicketCC.objects.filter(
ticket=ticket).all().values_list('email', flat=True)
for email_address in ticket_cc_list:
notifications_to_be_sent.append(email_address)
autoreply = is_autoreply(message)
if autoreply:
logger.info(
@ -516,7 +507,77 @@ def create_object_from_email_message(message, ticket_id, payload, files, logger)
return ticket
def object_from_message(message, queue, logger):
def get_ticket_id_from_subject_slug(
queue_slug: str,
subject: str,
logger: logging.Logger
) -> typing.Optional[int]:
"""Get a ticket id from the subject string
Performs a match on the subject using the queue_slug as reference,
returning the ticket id if a match is found.
"""
matchobj = re.match(r".*\[" + queue_slug + r"-(?P<id>\d+)\]", subject)
ticket_id = None
if matchobj:
# This is a reply or forward.
ticket_id = matchobj.group('id')
logger.info("Matched tracking ID %s-%s" % (queue_slug, ticket_id))
else:
logger.info("No tracking ID matched.")
return ticket_id
def add_file_if_always_save_incoming_email_message(
files_,
message: str
) -> None:
"""When `settings.HELPDESK_ALWAYS_SAVE_INCOMING_EMAIL_MESSAGE` is `True`
add a file to the files_ list"""
if getattr(django_settings, 'HELPDESK_ALWAYS_SAVE_INCOMING_EMAIL_MESSAGE', False):
# save message as attachment in case of some complex markup renders
# wrong
files_.append(
SimpleUploadedFile(
_("original_message.eml").replace(
".eml",
timezone.localtime().strftime("_%d-%m-%Y_%H:%M") + ".eml"
),
str(message).encode("utf-8"),
'text/plain'
)
)
def get_encoded_body(body: str) -> str:
try:
return body.encode('ascii').decode('unicode_escape')
except UnicodeEncodeError:
return body
def get_body_from_fragments(body) -> str:
"""Gets a body from the fragments, joined by a double line break"""
return "\n\n".join(f.content for f in EmailReplyParser.read(body).fragments)
def get_email_body_from_part_payload(part) -> str:
"""Gets an decoded body from the payload part, if the decode fails,
returns without encoding"""
try:
return encoding.smart_str(
part.get_payload(decode=True)
)
except UnicodeDecodeError:
return encoding.smart_str(
part.get_payload(decode=False)
)
def object_from_message(message: str,
queue: Queue,
logger: logging.Logger
) -> Ticket:
# 'message' must be an RFC822 formatted message.
message = email.message_from_string(message)
@ -541,20 +602,15 @@ def object_from_message(message, queue, logger):
for ignore in IgnoreEmail.objects.filter(Q(queues=queue) | Q(queues__isnull=True)):
if ignore.test(sender_email):
if ignore.keep_in_mailbox:
# By returning 'False' the message will be kept in the mailbox,
# and the 'True' will cause the message to be deleted.
return False
return True
# By returning 'False' the message will be kept in the mailbox,
# and the 'True' will cause the message to be deleted.
return not ignore.keep_in_mailbox
matchobj = re.match(r".*\[" + queue.slug + r"-(?P<id>\d+)\]", subject)
if matchobj:
# This is a reply or forward.
ticket = matchobj.group('id')
logger.info("Matched tracking ID %s-%s" % (queue.slug, ticket))
else:
logger.info("No tracking ID matched.")
ticket = None
ticket_id: typing.Optional[int] = get_ticket_id_from_subject_slug(
queue.slug,
subject,
logger
)
body = None
full_body = None
@ -578,13 +634,10 @@ def object_from_message(message, queue, logger):
body = decodeUnknown(part.get_content_charset(), body)
# have to use django_settings here so overwritting it works in tests
# the default value is False anyway
if ticket is None and getattr(django_settings, 'HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL', False):
if ticket_id is None and getattr(django_settings, 'HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL', False):
# first message in thread, we save full body to avoid
# losing forwards and things like that
body_parts = []
for f in EmailReplyParser.read(body).fragments:
body_parts.append(f.content)
full_body = '\n\n'.join(body_parts)
full_body = get_body_from_fragments(body)
body = EmailReplyParser.parse_reply(body)
else:
# second and other reply, save only first part of the
@ -592,18 +645,10 @@ def object_from_message(message, queue, logger):
body = EmailReplyParser.parse_reply(body)
full_body = body
# workaround to get unicode text out rather than escaped text
try:
body = body.encode('ascii').decode('unicode_escape')
except UnicodeEncodeError:
body.encode('utf-8')
body = get_encoded_body(body)
logger.debug("Discovered plain text MIME part")
else:
try:
email_body = encoding.smart_str(
part.get_payload(decode=True))
except UnicodeDecodeError:
email_body = encoding.smart_str(
part.get_payload(decode=False))
email_body = get_email_body_from_part_payload(part)
if not body and not full_body:
# no text has been parsed so far - try such deep parsing
@ -670,19 +715,7 @@ def object_from_message(message, queue, logger):
if not body:
body = ""
if getattr(django_settings, 'HELPDESK_ALWAYS_SAVE_INCOMING_EMAIL_MESSAGE', False):
# save message as attachment in case of some complex markup renders
# wrong
files.append(
SimpleUploadedFile(
_("original_message.eml").replace(
".eml",
timezone.localtime().strftime("_%d-%m-%Y_%H:%M") + ".eml"
),
str(message).encode("utf-8"),
'text/plain'
)
)
add_file_if_always_save_incoming_email_message(files, message)
smtp_priority = message.get('priority', '')
smtp_importance = message.get('importance', '')
@ -700,4 +733,4 @@ def object_from_message(message, queue, logger):
'files': files,
}
return create_object_from_email_message(message, ticket, payload, files, logger=logger)
return create_object_from_email_message(message, ticket_id, payload, files, logger=logger)

View File

@ -1964,6 +1964,10 @@ class TicketCustomFieldValue(models.Model):
def __str__(self):
return '%s / %s' % (self.ticket, self.field)
@property
def default_value(self) -> str:
return _("Not defined")
class Meta:
unique_together = (('ticket', 'field'),)
verbose_name = _('Ticket custom field value')

View File

@ -6,9 +6,9 @@ django-helpdesk - A Django powered ticket tracker for small enterprise.
views/staff.py - The bulk of the application - provides most business logic and
renders all staff-facing views.
"""
from ..lib import format_time_spent
from ..templated_email import send_templated_mail
from collections import defaultdict
from copy import deepcopy
from datetime import date, datetime, timedelta
from django.conf import settings
@ -16,6 +16,7 @@ from django.contrib.auth import get_user_model
from django.contrib.auth.decorators import user_passes_test
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import PermissionDenied, ValidationError
from django.core.handlers.wsgi import WSGIRequest
from django.core.paginator import EmptyPage, PageNotAnInteger, Paginator
from django.db.models import Q
from django.http import Http404, HttpResponse, HttpResponseRedirect, JsonResponse
@ -70,11 +71,15 @@ import json
import re
from rest_framework import status
from rest_framework.decorators import api_view
import typing
if helpdesk_settings.HELPDESK_KB_ENABLED:
from helpdesk.models import KBItem
DATE_RE: re.Pattern = re.compile(
r'(?P<month>\d{1,2})/(?P<day>\d{1,2})/(?P<year>\d{4})$'
)
User = get_user_model()
Query = get_query_class()
@ -268,7 +273,7 @@ def followup_edit(request, ticket_id, followup_id):
'time_spent': format_time_spent(followup.time_spent),
})
ticketcc_string, show_subscribe = \
ticketcc_string, __ = \
return_ticketccstring_and_show_subscribe(request.user, ticket)
return render(request, 'helpdesk/followup_edit.html', {
@ -346,8 +351,10 @@ def view_ticket(request, ticket_id):
if 'subscribe' in request.GET:
# Allow the user to subscribe him/herself to the ticket whilst viewing
# it.
ticket_cc, show_subscribe = \
return_ticketccstring_and_show_subscribe(request.user, ticket)
show_subscribe = return_ticketccstring_and_show_subscribe(
request.user, ticket
)[1]
if show_subscribe:
subscribe_staff_member_to_ticket(ticket, request.user)
return HttpResponseRedirect(reverse('helpdesk:view', args=[ticket.id]))
@ -481,10 +488,20 @@ def subscribe_staff_member_to_ticket(ticket, user, email='', can_view=True, can_
return subscribe_to_ticket_updates(ticket=ticket, user=user, email=email, can_view=can_view, can_update=can_update)
def update_ticket(request, ticket_id, public=False):
def get_ticket_from_request_with_authorisation(
request: WSGIRequest,
ticket_id: str,
public: bool
) -> typing.Union[
Ticket, typing.NoReturn
]:
"""Gets a ticket from the public status and if the user is authenticated and
has permissions to update tickets
ticket = None
Raises:
Http404 when the ticket can not be found or the user lacks permission
"""
if not (public or (
request.user.is_authenticated and
request.user.is_active and (
@ -506,41 +523,29 @@ def update_ticket(request, ticket_id, public=False):
'%s?next=%s' % (reverse('helpdesk:login'), request.path)
)
if not ticket:
ticket = get_object_or_404(Ticket, id=ticket_id)
return get_object_or_404(Ticket, id=ticket_id)
date_re = re.compile(
r'(?P<month>\d{1,2})/(?P<day>\d{1,2})/(?P<year>\d{4})$'
)
comment = request.POST.get('comment', '')
new_status = int(request.POST.get('new_status', ticket.status))
title = request.POST.get('title', '')
public = request.POST.get('public', False)
owner = int(request.POST.get('owner', -1))
priority = int(request.POST.get('priority', ticket.priority))
due_date_year = int(request.POST.get('due_date_year', 0))
due_date_month = int(request.POST.get('due_date_month', 0))
due_date_day = int(request.POST.get('due_date_day', 0))
if request.POST.get("time_spent"):
(hours, minutes) = [int(f)
for f in request.POST.get("time_spent").split(":")]
time_spent = timedelta(hours=hours, minutes=minutes)
else:
time_spent = None
# NOTE: jQuery's default for dates is mm/dd/yy
# very US-centric but for now that's the only format supported
# until we clean up code to internationalize a little more
def get_due_date_from_request_or_ticket(
request: WSGIRequest,
ticket: Ticket
) -> typing.Optional[datetime.date]:
"""Tries to locate the due date for a ticket from the `request.POST`
'due_date' parameter or the `due_date_*` paramaters.
"""
due_date = request.POST.get('due_date', None) or None
if due_date is not None:
# based on Django code to parse dates:
# https://docs.djangoproject.com/en/2.0/_modules/django/utils/dateparse/
match = date_re.match(due_date)
match = DATE_RE.match(due_date)
if match:
kw = {k: int(v) for k, v in match.groupdict().items()}
due_date = date(**kw)
else:
due_date_year = int(request.POST.get('due_date_year', 0))
due_date_month = int(request.POST.get('due_date_month', 0))
due_date_day = int(request.POST.get('due_date_day', 0))
# old way, probably deprecated?
if not (due_date_year and due_date_month and due_date_day):
due_date = ticket.due_date
@ -551,9 +556,146 @@ def update_ticket(request, ticket_id, public=False):
due_date = ticket.due_date
else:
due_date = timezone.now()
due_date = due_date.replace(
due_date_year, due_date_month, due_date_day)
due_date = due_date.replace(
due_date_year, due_date_month, due_date_day)
return due_date
def get_and_set_ticket_status(
new_status: str,
ticket: Ticket,
follow_up: FollowUp
) -> typing.Tuple[str, str]:
"""Performs comparision on previous status to new status,
updating the title as required.
Returns:
The old status as a display string, old status code string
"""
old_status_str = ticket.get_status_display()
old_status = ticket.status
if new_status != ticket.status:
ticket.status = new_status
ticket.save()
follow_up.new_status = new_status
if follow_up.title:
follow_up.title += ' and %s' % ticket.get_status_display()
else:
follow_up.title = '%s' % ticket.get_status_display()
if not follow_up.title:
if follow_up.comment:
follow_up.title = _('Comment')
else:
follow_up.title = _('Updated')
follow_up.save()
return (old_status_str, old_status)
def get_time_spent_from_request(request: WSGIRequest) -> typing.Optional[timedelta]:
if request.POST.get("time_spent"):
(hours, minutes) = [int(f)
for f in request.POST.get("time_spent").split(":")]
return timedelta(hours=hours, minutes=minutes)
return None
def update_messages_sent_to_by_public_and_status(
public: bool,
ticket: Ticket,
follow_up: FollowUp,
context: str,
messages_sent_to: typing.List[str],
files: typing.List[typing.Tuple[str, str]]
) -> Ticket:
"""Sets the status of the ticket"""
if public and (
follow_up.comment or (
follow_up.new_status in (
Ticket.RESOLVED_STATUS,
Ticket.CLOSED_STATUS
)
)
):
if follow_up.new_status == Ticket.RESOLVED_STATUS:
template = 'resolved_'
elif follow_up.new_status == Ticket.CLOSED_STATUS:
template = 'closed_'
else:
template = 'updated_'
roles = {
'submitter': (template + 'submitter', context),
'ticket_cc': (template + 'cc', context),
}
if ticket.assigned_to and ticket.assigned_to.usersettings_helpdesk.email_on_ticket_change:
roles['assigned_to'] = (template + 'cc', context)
messages_sent_to.update(
ticket.send(
roles,
dont_send_to=messages_sent_to,
fail_silently=True,
files=files
)
)
return ticket
def add_staff_subscription(
request: WSGIRequest,
ticket: Ticket
) -> None:
"""Auto subscribe the staff member if that's what the settigs say and the
user is authenticated and a staff member"""
if helpdesk_settings.HELPDESK_AUTO_SUBSCRIBE_ON_TICKET_RESPONSE and request.user.is_authenticated:
SHOW_SUBSCRIBE = return_ticketccstring_and_show_subscribe(
request.user, ticket
)[1]
if SHOW_SUBSCRIBE:
subscribe_staff_member_to_ticket(ticket, request.user)
def get_template_staff_and_template_cc(
reassigned, follow_up: FollowUp
) -> typing.Tuple[str, str]:
if reassigned:
template_staff = 'assigned_owner'
elif follow_up.new_status == Ticket.RESOLVED_STATUS:
template_staff = 'resolved_owner'
elif follow_up.new_status == Ticket.CLOSED_STATUS:
template_staff = 'closed_owner'
else:
template_staff = 'updated_owner'
if reassigned:
template_cc = 'assigned_cc'
elif follow_up.new_status == Ticket.RESOLVED_STATUS:
template_cc = 'resolved_cc'
elif follow_up.new_status == Ticket.CLOSED_STATUS:
template_cc = 'closed_cc'
else:
template_cc = 'updated_cc'
return template_staff, template_cc
def update_ticket(request, ticket_id, public=False):
ticket = get_ticket_from_request_with_authorisation(request, ticket_id, public)
comment = request.POST.get('comment', '')
new_status = int(request.POST.get('new_status', ticket.status))
title = request.POST.get('title', '')
public = request.POST.get('public', False)
owner = int(request.POST.get('owner', -1))
priority = int(request.POST.get('priority', ticket.priority))
time_spent = get_time_spent_from_request(request)
# NOTE: jQuery's default for dates is mm/dd/yy
# very US-centric but for now that's the only format supported
# until we clean up code to internationalize a little more
due_date = get_due_date_from_request_or_ticket(request, ticket)
no_changes = all([
not request.FILES,
not comment,
@ -613,28 +755,9 @@ def update_ticket(request, ticket_id, public=False):
f.title = _('Unassigned')
ticket.assigned_to = None
old_status_str = ticket.get_status_display()
old_status = ticket.status
if new_status != ticket.status:
ticket.status = new_status
ticket.save()
f.new_status = new_status
if f.title:
f.title += ' and %s' % ticket.get_status_display()
else:
f.title = '%s' % ticket.get_status_display()
old_status_str, old_status = get_and_set_ticket_status(new_status, ticket, f)
if not f.title:
if f.comment:
f.title = _('Comment')
else:
f.title = _('Updated')
f.save()
files = []
if request.FILES:
files = process_attachments(f, request.FILES.getlist('attachment'))
files = process_attachments(f, request.FILES.getlist('attachment')) if request.FILES else []
if title and title != ticket.title:
c = TicketChange(
@ -684,9 +807,12 @@ def update_ticket(request, ticket_id, public=False):
c.save()
ticket.due_date = due_date
if new_status in (Ticket.RESOLVED_STATUS, Ticket.CLOSED_STATUS):
if new_status == Ticket.RESOLVED_STATUS or ticket.resolution is None:
ticket.resolution = comment
if new_status in (
Ticket.RESOLVED_STATUS, Ticket.CLOSED_STATUS
) and (
new_status == Ticket.RESOLVED_STATUS or ticket.resolution is None
):
ticket.resolution = comment
# ticket might have changed above, so we re-instantiate context with the
# (possibly) updated ticket.
@ -701,34 +827,16 @@ def update_ticket(request, ticket_id, public=False):
messages_sent_to.add(request.user.email)
except AttributeError:
pass
if public and (f.comment or (
f.new_status in (Ticket.RESOLVED_STATUS,
Ticket.CLOSED_STATUS))):
if f.new_status == Ticket.RESOLVED_STATUS:
template = 'resolved_'
elif f.new_status == Ticket.CLOSED_STATUS:
template = 'closed_'
else:
template = 'updated_'
roles = {
'submitter': (template + 'submitter', context),
'ticket_cc': (template + 'cc', context),
}
if ticket.assigned_to and ticket.assigned_to.usersettings_helpdesk.email_on_ticket_change:
roles['assigned_to'] = (template + 'cc', context)
messages_sent_to.update(ticket.send(
roles, dont_send_to=messages_sent_to, fail_silently=True, files=files,))
if reassigned:
template_staff = 'assigned_owner'
elif f.new_status == Ticket.RESOLVED_STATUS:
template_staff = 'resolved_owner'
elif f.new_status == Ticket.CLOSED_STATUS:
template_staff = 'closed_owner'
else:
template_staff = 'updated_owner'
ticket = update_messages_sent_to_by_public_and_status(
public,
ticket,
f,
context,
messages_sent_to,
files
)
template_staff, template_cc = get_template_staff_and_template_cc(reassigned, f)
if ticket.assigned_to and (
ticket.assigned_to.usersettings_helpdesk.email_on_ticket_change
or (reassigned and ticket.assigned_to.usersettings_helpdesk.email_on_ticket_assign)
@ -740,30 +848,16 @@ def update_ticket(request, ticket_id, public=False):
files=files,
))
if reassigned:
template_cc = 'assigned_cc'
elif f.new_status == Ticket.RESOLVED_STATUS:
template_cc = 'resolved_cc'
elif f.new_status == Ticket.CLOSED_STATUS:
template_cc = 'closed_cc'
else:
template_cc = 'updated_cc'
messages_sent_to.update(ticket.send(
{'ticket_cc': (template_cc, context)},
dont_send_to=messages_sent_to,
fail_silently=True,
files=files,
))
ticket.save()
# auto subscribe user if enabled
if helpdesk_settings.HELPDESK_AUTO_SUBSCRIBE_ON_TICKET_RESPONSE and request.user.is_authenticated:
ticketcc_string, SHOW_SUBSCRIBE = return_ticketccstring_and_show_subscribe(
request.user, ticket)
if SHOW_SUBSCRIBE:
subscribe_staff_member_to_ticket(ticket, request.user)
add_staff_subscription(request, ticket)
return return_to_ticket(request.user, helpdesk_settings, ticket)
@ -895,7 +989,7 @@ mass_update = staff_member_required(mass_update)
# Prepare ticket attributes which will be displayed in the table to choose
# which value to keep when merging
ticket_attributes = (
TICKET_ATTRIBUTES = (
('created', _('Created date')),
('due_date', _('Due on')),
('get_status_display', _('Status')),
@ -906,6 +1000,133 @@ ticket_attributes = (
)
def merge_ticket_values(
request: WSGIRequest,
tickets: typing.List[Ticket],
custom_fields
) -> None:
for ticket in tickets:
ticket.values = {}
# Prepare the value for each attributes of this ticket
for attribute, __ in TICKET_ATTRIBUTES:
value = getattr(ticket, attribute, TicketCustomFieldValue.default_value)
# Check if attr is a get_FIELD_display
if attribute.startswith('get_') and attribute.endswith('_display'):
# Hack to call methods like get_FIELD_display()
value = getattr(ticket, attribute, TicketCustomFieldValue.default_value)()
ticket.values[attribute] = {
'value': value,
'checked': str(ticket.id) == request.POST.get(attribute)
}
# Prepare the value for each custom fields of this ticket
for custom_field in custom_fields:
try:
value = ticket.ticketcustomfieldvalue_set.get(
field=custom_field).value
except (TicketCustomFieldValue.DoesNotExist, ValueError):
value = TicketCustomFieldValue.default_value
ticket.values[custom_field.name] = {
'value': value,
'checked': str(ticket.id) == request.POST.get(custom_field.name)
}
def redirect_from_chosen_ticket(
request,
chosen_ticket,
tickets,
custom_fields
) -> HttpResponseRedirect:
# Save ticket fields values
for attribute, __ in TICKET_ATTRIBUTES:
id_for_attribute = request.POST.get(attribute)
if id_for_attribute != chosen_ticket.id:
try:
selected_ticket = tickets.get(id=id_for_attribute)
except (Ticket.DoesNotExist, ValueError):
continue
# Check if attr is a get_FIELD_display
if attribute.startswith('get_') and attribute.endswith('_display'):
# Keep only the FIELD part
attribute = attribute[4:-8]
# Get value from selected ticket and then save it on
# the chosen ticket
value = getattr(selected_ticket, attribute)
setattr(chosen_ticket, attribute, value)
# Save custom fields values
for custom_field in custom_fields:
id_for_custom_field = request.POST.get(custom_field.name)
if id_for_custom_field != chosen_ticket.id:
try:
selected_ticket = tickets.get(
id=id_for_custom_field)
except (Ticket.DoesNotExist, ValueError):
continue
# Check if the value for this ticket custom field
# exists
try:
value = selected_ticket.ticketcustomfieldvalue_set.get(
field=custom_field).value
except TicketCustomFieldValue.DoesNotExist:
continue
# Create the custom field value or update it with the
# value from the selected ticket
custom_field_value, created = chosen_ticket.ticketcustomfieldvalue_set.get_or_create(
field=custom_field,
defaults={'value': value}
)
if not created:
custom_field_value.value = value
custom_field_value.save(update_fields=['value'])
# Save changes
chosen_ticket.save()
# For other tickets, save the link to the ticket in which they have been merged to
# and set status to DUPLICATE
for ticket in tickets.exclude(id=chosen_ticket.id):
ticket.merged_to = chosen_ticket
ticket.status = Ticket.DUPLICATE_STATUS
ticket.save()
# Send mail to submitter email and ticket CC to let them
# know ticket has been merged
context = safe_template_context(ticket)
if ticket.submitter_email:
send_templated_mail(
template_name='merged',
context=context,
recipients=[ticket.submitter_email],
bcc=[
cc.email_address for cc in ticket.ticketcc_set.select_related('user')],
sender=ticket.queue.from_address,
fail_silently=True
)
# Move all followups and update their title to know they
# come from another ticket
ticket.followup_set.update(
ticket=chosen_ticket,
# Next might exceed maximum 200 characters limit
title=_('[Merged from #%(id)d] %(title)s') % {
'id': ticket.id, 'title': ticket.title}
)
# Add submitter_email, assigned_to email and ticketcc to
# chosen ticket if necessary
chosen_ticket.add_email_to_ticketcc_if_not_in(
email=ticket.submitter_email)
if ticket.assigned_to and ticket.assigned_to.email:
chosen_ticket.add_email_to_ticketcc_if_not_in(
email=ticket.assigned_to.email)
for ticketcc in ticket.ticketcc_set.all():
chosen_ticket.add_email_to_ticketcc_if_not_in(
ticketcc=ticketcc)
return redirect(chosen_ticket)
@staff_member_required
def merge_tickets(request):
"""
@ -920,31 +1141,8 @@ def merge_tickets(request):
tickets = ticket_select_form.cleaned_data.get('tickets')
custom_fields = CustomField.objects.all()
default = _('Not defined')
for ticket in tickets:
ticket.values = {}
# Prepare the value for each attributes of this ticket
for attribute, display_name in ticket_attributes:
value = getattr(ticket, attribute, default)
# Check if attr is a get_FIELD_display
if attribute.startswith('get_') and attribute.endswith('_display'):
# Hack to call methods like get_FIELD_display()
value = getattr(ticket, attribute, default)()
ticket.values[attribute] = {
'value': value,
'checked': str(ticket.id) == request.POST.get(attribute)
}
# Prepare the value for each custom fields of this ticket
for custom_field in custom_fields:
try:
value = ticket.ticketcustomfieldvalue_set.get(
field=custom_field).value
except (TicketCustomFieldValue.DoesNotExist, ValueError):
value = default
ticket.values[custom_field.name] = {
'value': value,
'checked': str(ticket.id) == request.POST.get(custom_field.name)
}
merge_ticket_values(request, tickets, custom_fields)
if request.method == 'POST':
# Find which ticket has been chosen to be the main one
@ -958,103 +1156,58 @@ def merge_tickets(request):
'Please choose a ticket in which the others will be merged into.')
)
else:
# Save ticket fields values
for attribute, display_name in ticket_attributes:
id_for_attribute = request.POST.get(attribute)
if id_for_attribute != chosen_ticket.id:
try:
selected_ticket = tickets.get(id=id_for_attribute)
except (Ticket.DoesNotExist, ValueError):
continue
# Check if attr is a get_FIELD_display
if attribute.startswith('get_') and attribute.endswith('_display'):
# Keep only the FIELD part
attribute = attribute[4:-8]
# Get value from selected ticket and then save it on
# the chosen ticket
value = getattr(selected_ticket, attribute)
setattr(chosen_ticket, attribute, value)
# Save custom fields values
for custom_field in custom_fields:
id_for_custom_field = request.POST.get(custom_field.name)
if id_for_custom_field != chosen_ticket.id:
try:
selected_ticket = tickets.get(
id=id_for_custom_field)
except (Ticket.DoesNotExist, ValueError):
continue
# Check if the value for this ticket custom field
# exists
try:
value = selected_ticket.ticketcustomfieldvalue_set.get(
field=custom_field).value
except TicketCustomFieldValue.DoesNotExist:
continue
# Create the custom field value or update it with the
# value from the selected ticket
custom_field_value, created = chosen_ticket.ticketcustomfieldvalue_set.get_or_create(
field=custom_field,
defaults={'value': value}
)
if not created:
custom_field_value.value = value
custom_field_value.save(update_fields=['value'])
# Save changes
chosen_ticket.save()
# For other tickets, save the link to the ticket in which they have been merged to
# and set status to DUPLICATE
for ticket in tickets.exclude(id=chosen_ticket.id):
ticket.merged_to = chosen_ticket
ticket.status = Ticket.DUPLICATE_STATUS
ticket.save()
# Send mail to submitter email and ticket CC to let them
# know ticket has been merged
context = safe_template_context(ticket)
if ticket.submitter_email:
send_templated_mail(
template_name='merged',
context=context,
recipients=[ticket.submitter_email],
bcc=[
cc.email_address for cc in ticket.ticketcc_set.select_related('user')],
sender=ticket.queue.from_address,
fail_silently=True
)
# Move all followups and update their title to know they
# come from another ticket
ticket.followup_set.update(
ticket=chosen_ticket,
# Next might exceed maximum 200 characters limit
title=_('[Merged from #%(id)d] %(title)s') % {
'id': ticket.id, 'title': ticket.title}
)
# Add submitter_email, assigned_to email and ticketcc to
# chosen ticket if necessary
chosen_ticket.add_email_to_ticketcc_if_not_in(
email=ticket.submitter_email)
if ticket.assigned_to and ticket.assigned_to.email:
chosen_ticket.add_email_to_ticketcc_if_not_in(
email=ticket.assigned_to.email)
for ticketcc in ticket.ticketcc_set.all():
chosen_ticket.add_email_to_ticketcc_if_not_in(
ticketcc=ticketcc)
return redirect(chosen_ticket)
return redirect_from_chosen_ticket(
request,
chosen_ticket,
tickets,
custom_fields
)
return render(request, 'helpdesk/ticket_merge.html', {
'tickets': tickets,
'ticket_attributes': ticket_attributes,
'ticket_attributes': TICKET_ATTRIBUTES,
'custom_fields': custom_fields,
'ticket_select_form': ticket_select_form
})
def check_redirect_on_user_query(request, huser):
"""If the user is coming from the header/navigation search box, lets' first
look at their query to see if they have entered a valid ticket number. If
they have, just redirect to that ticket number. Otherwise, we treat it as
a keyword search.
"""
if request.GET.get('search_type', None) == 'header':
query = request.GET.get('q')
filter_ = None
if query.find('-') > 0:
try:
queue, id_ = Ticket.queue_and_id_from_query(query)
id_ = int(id)
except ValueError:
id_ = None
if id_:
filter_ = {'queue__slug': queue, 'id': id_}
else:
try:
query = int(query)
except ValueError:
query = None
if query:
filter_ = {'id': int(query)}
if filter_:
try:
ticket = huser.get_tickets_in_queues().get(**filter_)
return HttpResponseRedirect(ticket.staff_url)
except Ticket.DoesNotExist:
# Go on to standard keyword searching
pass
return None
@helpdesk_staff_member_required
def ticket_list(request):
context = {}
@ -1079,40 +1232,10 @@ def ticket_list(request):
'sortreverse': False,
}
# If the user is coming from the header/navigation search box, lets' first
# look at their query to see if they have entered a valid ticket number. If
# they have, just redirect to that ticket number. Otherwise, we treat it as
# a keyword search.
if request.GET.get('search_type', None) == 'header':
query = request.GET.get('q')
filter = None
if query.find('-') > 0:
try:
queue, id = Ticket.queue_and_id_from_query(query)
id = int(id)
except ValueError:
id = None
if id:
filter = {'queue__slug': queue, 'id': id}
else:
try:
query = int(query)
except ValueError:
query = None
if query:
filter = {'id': int(query)}
if filter:
try:
ticket = huser.get_tickets_in_queues().get(**filter)
return HttpResponseRedirect(ticket.staff_url)
except Ticket.DoesNotExist:
# Go on to standard keyword searching
pass
#: check for a redirect, see function doc for details
redirect = check_redirect_on_user_query(request, huser)
if redirect:
return redirect
try:
saved_query, query_params = load_saved_query(request, query_params)
except QueryLoadError:
@ -1308,15 +1431,15 @@ class CreateTicketView(MustBeStaffMixin, abstract_views.AbstractCreateTicketMixi
@helpdesk_staff_member_required
def raw_details(request, type):
def raw_details(request, type_):
# TODO: This currently only supports spewing out 'PreSetReply' objects,
# in the future it needs to be expanded to include other items. All it
# does is return a plain-text representation of an object.
if type not in ('preset',):
if type_ not in ('preset',):
raise Http404
if type == 'preset' and request.GET.get('id', False):
if type_ == 'preset' and request.GET.get('id', False):
try:
preset = PreSetReply.objects.get(id=request.GET.get('id'))
return HttpResponse(preset.body)
@ -1416,12 +1539,18 @@ def report_index(request):
report_index = staff_member_required(report_index)
@helpdesk_staff_member_required
def run_report(request, report):
def get_report_queryset_or_redirect(request, report):
if Ticket.objects.all().count() == 0 or report not in (
'queuemonth', 'usermonth', 'queuestatus', 'queuepriority', 'userstatus',
'userpriority', 'userqueue', 'daysuntilticketclosedbymonth'):
return HttpResponseRedirect(reverse("helpdesk:report_index"))
"queuemonth",
"usermonth",
"queuestatus",
"queuepriority",
"userstatus",
"userpriority",
"userqueue",
"daysuntilticketclosedbymonth"
):
return None, None, HttpResponseRedirect(reverse("helpdesk:report_index"))
report_queryset = Ticket.objects.all().select_related().filter(
queue__in=HelpdeskUser(request.user).get_queues()
@ -1430,12 +1559,79 @@ def run_report(request, report):
try:
saved_query, query_params = load_saved_query(request)
except QueryLoadError:
return HttpResponseRedirect(reverse('helpdesk:report_index'))
return None, HttpResponseRedirect(reverse('helpdesk:report_index'))
return report_queryset, query_params, saved_query, None
def get_report_table_and_totals(header1, summarytable, possible_options):
table = []
totals = {}
for item in header1:
data = []
for hdr in possible_options:
if hdr not in totals.keys():
totals[hdr] = summarytable[item, hdr]
else:
totals[hdr] += summarytable[item, hdr]
data.append(summarytable[item, hdr])
table.append([item] + data)
return table, totals
def update_summary_tables(report_queryset, report, summarytable, summarytable2):
metric3 = False
for ticket in report_queryset:
if report == 'userpriority':
metric1 = u'%s' % ticket.get_assigned_to
metric2 = u'%s' % ticket.get_priority_display()
elif report == 'userqueue':
metric1 = u'%s' % ticket.get_assigned_to
metric2 = u'%s' % ticket.queue.title
elif report == 'userstatus':
metric1 = u'%s' % ticket.get_assigned_to
metric2 = u'%s' % ticket.get_status_display()
elif report == 'usermonth':
metric1 = u'%s' % ticket.get_assigned_to
metric2 = u'%s-%s' % (ticket.created.year, ticket.created.month)
elif report == 'queuepriority':
metric1 = u'%s' % ticket.queue.title
metric2 = u'%s' % ticket.get_priority_display()
elif report == 'queuestatus':
metric1 = u'%s' % ticket.queue.title
metric2 = u'%s' % ticket.get_status_display()
elif report == 'queuemonth':
metric1 = u'%s' % ticket.queue.title
metric2 = u'%s-%s' % (ticket.created.year, ticket.created.month)
elif report == 'daysuntilticketclosedbymonth':
metric1 = u'%s' % ticket.queue.title
metric2 = u'%s-%s' % (ticket.created.year, ticket.created.month)
metric3 = ticket.modified - ticket.created
metric3 = metric3.days
summarytable[metric1, metric2] += 1
if metric3:
if report == 'daysuntilticketclosedbymonth':
summarytable2[metric1, metric2] += metric3
@helpdesk_staff_member_required
def run_report(request, report):
report_queryset, query_params, saved_query, redirect = get_report_queryset_or_redirect(
request, report
)
if redirect:
return redirect
if request.GET.get('saved_query', None):
Query(report_queryset, query_to_base64(query_params))
from collections import defaultdict
summarytable = defaultdict(int)
# a second table for more complex queries
summarytable2 = defaultdict(int)
@ -1510,50 +1706,7 @@ def run_report(request, report):
col1heading = _('Queue')
possible_options = periods
charttype = 'date'
metric3 = False
for ticket in report_queryset:
if report == 'userpriority':
metric1 = u'%s' % ticket.get_assigned_to
metric2 = u'%s' % ticket.get_priority_display()
elif report == 'userqueue':
metric1 = u'%s' % ticket.get_assigned_to
metric2 = u'%s' % ticket.queue.title
elif report == 'userstatus':
metric1 = u'%s' % ticket.get_assigned_to
metric2 = u'%s' % ticket.get_status_display()
elif report == 'usermonth':
metric1 = u'%s' % ticket.get_assigned_to
metric2 = u'%s-%s' % (ticket.created.year, ticket.created.month)
elif report == 'queuepriority':
metric1 = u'%s' % ticket.queue.title
metric2 = u'%s' % ticket.get_priority_display()
elif report == 'queuestatus':
metric1 = u'%s' % ticket.queue.title
metric2 = u'%s' % ticket.get_status_display()
elif report == 'queuemonth':
metric1 = u'%s' % ticket.queue.title
metric2 = u'%s-%s' % (ticket.created.year, ticket.created.month)
elif report == 'daysuntilticketclosedbymonth':
metric1 = u'%s' % ticket.queue.title
metric2 = u'%s-%s' % (ticket.created.year, ticket.created.month)
metric3 = ticket.modified - ticket.created
metric3 = metric3.days
summarytable[metric1, metric2] += 1
if metric3:
if report == 'daysuntilticketclosedbymonth':
summarytable2[metric1, metric2] += metric3
table = []
update_summary_tables(report_queryset, report, summarytable, summarytable2)
if report == 'daysuntilticketclosedbymonth':
for key in summarytable2.keys():
summarytable[key] = summarytable2[key] / summarytable[key]
@ -1563,18 +1716,9 @@ def run_report(request, report):
column_headings = [col1heading] + possible_options
# Prepare a dict to store totals for each possible option
totals = {}
table, totals = get_report_table_and_totals(header1, summarytable, possible_options)
# Pivot the data so that 'header1' fields are always first column
# in the row, and 'possible_options' are always the 2nd - nth columns.
for item in header1:
data = []
for hdr in possible_options:
if hdr not in totals.keys():
totals[hdr] = summarytable[item, hdr]
else:
totals[hdr] += summarytable[item, hdr]
data.append(summarytable[item, hdr])
table.append([item] + data)
# Zip data and headers together in one list for Morris.js charts
# will get a list like [(Header1, Data1), (Header2, Data2)...]
@ -1634,8 +1778,8 @@ save_query = staff_member_required(save_query)
@helpdesk_staff_member_required
def delete_saved_query(request, id):
query = get_object_or_404(SavedSearch, id=id, user=request.user)
def delete_saved_query(request, pk):
query = get_object_or_404(SavedSearch, id=pk, user=request.user)
if request.method == 'POST':
query.delete()
@ -1684,8 +1828,8 @@ email_ignore_add = superuser_required(email_ignore_add)
@helpdesk_superuser_required
def email_ignore_del(request, id):
ignore = get_object_or_404(IgnoreEmail, id=id)
def email_ignore_del(request, pk):
ignore = get_object_or_404(IgnoreEmail, id=pk)
if request.method == 'POST':
ignore.delete()
return HttpResponseRedirect(reverse('helpdesk:email_ignore'))

View File

@ -14,7 +14,7 @@ import os
import sys
class QuickDjangoTest(object):
class QuickDjangoTest:
"""
A quick way to run the Django test suite without a fully-configured project.
@ -78,6 +78,7 @@ class QuickDjangoTest(object):
def __init__(self, *args, **kwargs):
self.tests = args
self.kwargs = kwargs or {"verbosity": 1}
self._tests()
def _tests(self):
@ -112,7 +113,7 @@ class QuickDjangoTest(object):
)
from django.test.runner import DiscoverRunner
test_runner = DiscoverRunner(verbosity=1)
test_runner = DiscoverRunner(verbosity=self.kwargs["verbosity"])
django.setup()
failures = test_runner.run_tests(self.tests)
@ -134,7 +135,8 @@ if __name__ == '__main__':
description="Run Django tests."
)
parser.add_argument('tests', nargs="*", type=str)
parser.add_argument("--verbosity", "-v", nargs="?", type=int, default=1)
args = parser.parse_args()
if not args.tests:
args.tests = ['helpdesk']
QuickDjangoTest(*args.tests)
QuickDjangoTest(*args.tests, verbosity=args.verbosity)