from base64 import b64decode, b64encode from django.db.models import Q, Max from django.db.models import F, Window, Subquery, OuterRef from .models import FollowUp from django.urls import reverse from django.utils.html import escape from django.utils.translation import gettext as _ from helpdesk.serializers import DatatablesTicketSerializer import json from model_utils import Choices def query_to_base64(query): """ Converts a query dict object to a base64-encoded bytes object. """ return b64encode(json.dumps(query).encode("UTF-8")).decode("ascii") def query_from_base64(b64data): """ Converts base64-encoded bytes object back to a query dict object. """ query = {"search_string": ""} query.update(json.loads(b64decode(b64data).decode("utf-8"))) if query["search_string"] is None: query["search_string"] = "" return query def get_search_filter_args(search): if not search: return Q() if search.startswith("queue:"): return Q(queue__title__icontains=search[len("queue:") :]) if search.startswith("priority:"): return Q(priority__icontains=search[len("priority:") :]) my_filter = Q() for subsearch in search.split("OR"): subsearch = subsearch.strip() if not subsearch: continue my_filter = ( filter | Q(id__icontains=subsearch) | Q(title__icontains=subsearch) | Q(description__icontains=subsearch) | Q(priority__icontains=subsearch) | Q(resolution__icontains=subsearch) | Q(submitter_email__icontains=subsearch) | Q(assigned_to__email__icontains=subsearch) | Q(ticketcustomfieldvalue__value__icontains=subsearch) | Q(created__icontains=subsearch) | Q(due_date__icontains=subsearch) ) return my_filter DATATABLES_ORDER_COLUMN_CHOICES = Choices( ("0", "id"), ("1", "title"), ("2", "priority"), ("3", "queue"), ("4", "status"), ("5", "created"), ("6", "due_date"), ("7", "assigned_to"), ("8", "submitter_email"), ("9", "last_followup"), # ('10', 'time_spent'), ("11", "kbitem"), ) def get_query_class(): from django.conf import settings def _get_query_class(): return __Query__ return getattr(settings, "HELPDESK_QUERY_CLASS", _get_query_class)() class __Query__: def __init__(self, huser, base64query=None, query_params=None): self.huser = huser self.params = query_params if query_params else query_from_base64(base64query) self.base64 = base64query if base64query else query_to_base64(query_params) self.result = None def get_search_filter_args(self): search = self.params.get("search_string", "") return get_search_filter_args(search) def __run__(self, queryset): """ Apply a dict-based set of value_filters & parameters to a queryset. queryset is a Django queryset, eg MyModel.objects.all() or MyModel.objects.filter(user=request.user) params is a dictionary that contains the following: filtering: A dict of Django ORM value_filters, eg: {'user__id__in': [1, 3, 103], 'title__contains': 'foo'} search_string: A freetext search string sorting: The name of the column to sort by """ q_args = [] value_filters = self.params.get("filtering", {}) null_filters = self.params.get("filtering_null", {}) if null_filters: if value_filters: # Check if any of the value value_filters are for the same field as the # ISNULL filter so that an OR filter can be set up matched_null_keys = [] for null_key in null_filters: field_path = null_key[:-8] # Chop off the "__isnull" matched_key = None for val_key in value_filters: if val_key.startswith(field_path): matched_key = val_key break if matched_key: # Remove the matching filters into a Q param matched_null_keys.append(null_key) # Create an OR query for the selected value(s) OR if the field is NULL v = {} v[val_key] = value_filters[val_key] n = {} n[null_key] = null_filters[null_key] q_args.append((Q(**v) | Q(**n))) del value_filters[matched_key] # Now remove the matched null keys for null_key in matched_null_keys: del null_filters[null_key] queryset = queryset.filter( *q_args, (Q(**value_filters) & Q(**null_filters)) & self.get_search_filter_args(), ) sorting = self.params.get("sorting", None) if sorting: sortreverse = self.params.get("sortreverse", None) if sortreverse: sorting = "-%s" % sorting queryset = queryset.order_by(sorting) # https://stackoverflow.com/questions/30487056/django-queryset-contains-duplicate-entries return queryset.distinct() def get(self): # Prefilter the allowed tickets tickets = self.huser.get_tickets_in_queues().select_related() return self.__run__(tickets) def get_datatables_context(self, **kwargs): """ This function takes in a list of ticket objects from the views and throws it to the datatables on ticket_list.html. If a search string was entered, this function filters existing dataset on search string and returns a filtered filtered list. The `draw`, `length` etc parameters are for datatables to display meta data on the table contents. The returning queryset is passed to a Serializer called DatatablesTicketSerializer in serializers.py. """ objects = self.get() order_by = "-created" draw = int(kwargs.get("draw", [0])[0]) length = int(kwargs.get("length", [25])[0]) start = int(kwargs.get("start", [0])[0]) search_value = kwargs.get("search[value]", [""])[0] order_column = kwargs.get("order[0][column]", ["5"])[0] order = kwargs.get("order[0][dir]", ["asc"])[0] order_column = DATATABLES_ORDER_COLUMN_CHOICES[order_column] # django orm '-' -> desc if order == "desc": order_column = "-" + order_column queryset = objects.annotate( last_followup=Subquery( FollowUp.objects.order_by() .annotate( last_followup=Window( expression=Max("date"), partition_by=[ F("ticket_id"), ], order_by="-date", ) ) .filter(ticket_id=OuterRef("id")) .values("last_followup") .distinct() ) ).order_by(order_by) total = queryset.count() if search_value: # Dead code currently queryset = queryset.filter(get_search_filter_args(search_value)) count = queryset.count() queryset = queryset.order_by(order_column)[start : start + length] return { "data": DatatablesTicketSerializer(queryset, many=True).data, "recordsFiltered": count, "recordsTotal": total, "draw": draw, } def get_timeline_context(self): events = [] for ticket in self.get(): for followup in ticket.followup_set.all(): event = { "start_date": self.mk_timeline_date(followup.date), "text": { "headline": ticket.title + " - " + followup.title, "text": ( ( escape(followup.comment) if followup.comment else _("No text") ) + '
%s' % ( reverse( "helpdesk:view", kwargs={"ticket_id": ticket.pk} ), _("View ticket"), ) ), }, "group": _("Messages"), } events.append(event) return { "events": events, } def mk_timeline_date(self, date): return { "year": date.year, "month": date.month, "day": date.day, "hour": date.hour, "minute": date.minute, "second": date.second, }