Merge pull request #746 from CedricCarrard/fix/issue-#505

Fixed #505 -- Remove infinite loop with user_passes_test
This commit is contained in:
Garret Wassermann 2019-03-20 13:17:22 -04:00 committed by GitHub
commit 671cdd19c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 126 additions and 107 deletions

View File

@ -1,9 +1,12 @@
from functools import wraps from functools import wraps
from django.urls import reverse from django.core.exceptions import PermissionDenied
from django.http import HttpResponseRedirect, Http404 from django.http import Http404
from django.shortcuts import redirect
from django.utils.decorators import available_attrs from django.utils.decorators import available_attrs
from helpdesk import settings as helpdesk_settings from helpdesk import settings as helpdesk_settings
@ -15,9 +18,41 @@ def protect_view(view_func):
@wraps(view_func, assigned=available_attrs(view_func)) @wraps(view_func, assigned=available_attrs(view_func))
def _wrapped_view(request, *args, **kwargs): def _wrapped_view(request, *args, **kwargs):
if not request.user.is_authenticated and helpdesk_settings.HELPDESK_REDIRECT_TO_LOGIN_BY_DEFAULT: if not request.user.is_authenticated and helpdesk_settings.HELPDESK_REDIRECT_TO_LOGIN_BY_DEFAULT:
return HttpResponseRedirect(reverse('helpdesk:login')) return redirect('helpdesk:login')
elif not request.user.is_authenticated and helpdesk_settings.HELPDESK_ANON_ACCESS_RAISES_404: elif not request.user.is_authenticated and helpdesk_settings.HELPDESK_ANON_ACCESS_RAISES_404:
raise Http404 raise Http404
return view_func(request, *args, **kwargs) return view_func(request, *args, **kwargs)
return _wrapped_view return _wrapped_view
def staff_member_required(view_func):
"""
Decorator for staff member the views checking user, redirecting
to the log-in page if necessary or returning 403
"""
@wraps(view_func, assigned=available_attrs(view_func))
def _wrapped_view(request, *args, **kwargs):
if not request.user.is_authenticated and not request.user.is_active:
return redirect('helpdesk:login')
if not helpdesk_settings.HELPDESK_ALLOW_NON_STAFF_TICKET_UPDATE and not request.user.is_staff:
raise PermissionDenied()
return view_func(request, *args, **kwargs)
return _wrapped_view
def superuser_required(view_func):
"""
Decorator for superuser member the views checking user, redirecting
to the log-in page if necessary or returning 403
"""
@wraps(view_func, assigned=available_attrs(view_func))
def _wrapped_view(request, *args, **kwargs):
if not request.user.is_authenticated and not request.user.is_active:
return redirect('helpdesk:login')
if not request.user.is_superuser:
raise PermissionDenied()
return view_func(request, *args, **kwargs)
return _wrapped_view

View File

@ -5,12 +5,18 @@ from django.contrib.auth import get_user_model
User = get_user_model() User = get_user_model()
def get_staff_user(username='helpdesk.staff', password='password'): def get_user(username='helpdesk.staff',
password='password',
is_staff=False,
is_superuser=False):
try: try:
user = User.objects.get(username=username) user = User.objects.get(username=username)
except User.DoesNotExist: except User.DoesNotExist:
user = User.objects.create_user(username=username, password=password, email='staff@example.com') user = User.objects.create_user(username=username,
user.is_staff = True password=password,
email='%s@example.com' % username)
user.is_staff = is_staff
user.is_superuser = is_superuser
user.save() user.save()
else: else:
user.set_password(password) user.set_password(password)

View File

@ -3,7 +3,7 @@ from django.urls import reverse
from django.test import TestCase from django.test import TestCase
from helpdesk.models import KBCategory from helpdesk.models import KBCategory
from helpdesk.tests.helpers import get_staff_user, reload_urlconf from helpdesk.tests.helpers import get_user, reload_urlconf
class TestKBDisabled(TestCase): class TestKBDisabled(TestCase):
@ -26,7 +26,7 @@ class TestKBDisabled(TestCase):
"""Test proper rendering of navigation.html by accessing the dashboard""" """Test proper rendering of navigation.html by accessing the dashboard"""
from django.urls import NoReverseMatch from django.urls import NoReverseMatch
self.client.login(username=get_staff_user().get_username(), password='password') self.client.login(username=get_user(is_staff=True).get_username(), password='password')
self.assertRaises(NoReverseMatch, reverse, 'helpdesk:kb_index') self.assertRaises(NoReverseMatch, reverse, 'helpdesk:kb_index')
try: try:
response = self.client.get(reverse('helpdesk:dashboard')) response = self.client.get(reverse('helpdesk:dashboard'))
@ -45,3 +45,48 @@ class TestKBDisabled(TestCase):
response = self.client.get(reverse('helpdesk:home')) response = self.client.get(reverse('helpdesk:home'))
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, 'helpdesk/public_homepage.html') self.assertTemplateUsed(response, 'helpdesk/public_homepage.html')
class TestDecorator(TestCase):
def test_staff_member_restrictions(self):
user = get_user(username='helpdesk.user',
password='password')
self.client.login(username=user.get_username(),
password='password')
response = self.client.get(reverse('helpdesk:list'))
self.assertEqual(response.status_code, 403)
def test_staff_member_access(self):
user = get_user(username='helpdesk.user',
password='password',
is_staff=True)
self.client.login(username=user.get_username(),
password='password')
response = self.client.get(reverse('helpdesk:list'))
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, 'helpdesk/ticket_list.html')
def test_superuser_member_restrictions(self):
user = get_user(username='helpdesk.superuser',
password='password',
is_staff=True)
self.client.login(username=user.get_username(),
password='password')
response = self.client.get(reverse('helpdesk:email_ignore'))
self.assertEqual(response.status_code, 403)
def test_superuser_member_access(self):
user = get_user(username='helpdesk.superuser',
password='password',
is_staff=True,
is_superuser=True)
self.client.login(username=user.get_username(),
password='password')
response = self.client.get(reverse('helpdesk:email_ignore'))
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, 'helpdesk/email_ignore_list.html')

View File

@ -2,7 +2,7 @@
from django.urls import reverse from django.urls import reverse
from django.test import TestCase from django.test import TestCase
from helpdesk.models import Queue from helpdesk.models import Queue
from helpdesk.tests.helpers import get_staff_user from helpdesk.tests.helpers import get_user
class TestSavingSharedQuery(TestCase): class TestSavingSharedQuery(TestCase):
@ -14,7 +14,7 @@ class TestSavingSharedQuery(TestCase):
def test_cansavequery(self): def test_cansavequery(self):
"""Can a query be saved""" """Can a query be saved"""
url = reverse('helpdesk:savequery') url = reverse('helpdesk:savequery')
self.client.login(username=get_staff_user().get_username(), self.client.login(username=get_user(is_staff=True).get_username(),
password='password') password='password')
response = self.client.post( response = self.client.post(
url, url,

View File

@ -13,10 +13,8 @@ import re
from django import VERSION as DJANGO_VERSION from django import VERSION as DJANGO_VERSION
from django.conf import settings from django.conf import settings
from django.contrib.auth import get_user_model from django.contrib.auth import get_user_model
from django.contrib.auth.decorators import user_passes_test
from django.urls import reverse from django.urls import reverse
from django.core.exceptions import ValidationError, PermissionDenied from django.core.exceptions import ValidationError, PermissionDenied
from django.db import connection
from django.db.models import Q from django.db.models import Q
from django.http import HttpResponseRedirect, Http404, HttpResponse from django.http import HttpResponseRedirect, Http404, HttpResponse
from django.shortcuts import render, get_object_or_404 from django.shortcuts import render, get_object_or_404
@ -32,8 +30,9 @@ from helpdesk.forms import (
TicketForm, UserSettingsForm, EmailIgnoreForm, EditTicketForm, TicketCCForm, TicketForm, UserSettingsForm, EmailIgnoreForm, EditTicketForm, TicketCCForm,
TicketCCEmailForm, TicketCCUserForm, EditFollowUpForm, TicketDependencyForm TicketCCEmailForm, TicketCCUserForm, EditFollowUpForm, TicketDependencyForm
) )
from helpdesk.decorators import staff_member_required, superuser_required
from helpdesk.lib import ( from helpdesk.lib import (
send_templated_mail, query_to_dict, apply_query, safe_template_context, send_templated_mail, apply_query, safe_template_context,
process_attachments, queue_template_context, process_attachments, queue_template_context,
) )
from helpdesk.models import ( from helpdesk.models import (
@ -42,22 +41,10 @@ from helpdesk.models import (
) )
from helpdesk import settings as helpdesk_settings from helpdesk import settings as helpdesk_settings
User = get_user_model() User = get_user_model()
if helpdesk_settings.HELPDESK_ALLOW_NON_STAFF_TICKET_UPDATE:
# treat 'normal' users like 'staff'
staff_member_required = user_passes_test(
lambda u: u.is_authenticated and u.is_active)
else:
staff_member_required = user_passes_test(
lambda u: u.is_authenticated and u.is_active and u.is_staff)
superuser_required = user_passes_test(
lambda u: u.is_authenticated and u.is_active and u.is_superuser)
def _get_user_queues(user): def _get_user_queues(user):
"""Return the list of Queues the user can access. """Return the list of Queues the user can access.
@ -97,6 +84,7 @@ def _is_my_ticket(user, ticket):
return False return False
@staff_member_required
def dashboard(request): def dashboard(request):
""" """
A quick summary overview for users: A list of their own tickets, a table A quick summary overview for users: A list of their own tickets, a table
@ -162,9 +150,7 @@ def dashboard(request):
}) })
dashboard = staff_member_required(dashboard) @staff_member_required
def delete_ticket(request, ticket_id): def delete_ticket(request, ticket_id):
ticket = get_object_or_404(Ticket, id=ticket_id) ticket = get_object_or_404(Ticket, id=ticket_id)
if not _has_access_to_queue(request.user, ticket.queue): if not _has_access_to_queue(request.user, ticket.queue):
@ -181,9 +167,7 @@ def delete_ticket(request, ticket_id):
return HttpResponseRedirect(reverse('helpdesk:home')) return HttpResponseRedirect(reverse('helpdesk:home'))
delete_ticket = staff_member_required(delete_ticket) @staff_member_required
def followup_edit(request, ticket_id, followup_id): def followup_edit(request, ticket_id, followup_id):
"""Edit followup options with an ability to change the ticket.""" """Edit followup options with an ability to change the ticket."""
followup = get_object_or_404(FollowUp, id=followup_id) followup = get_object_or_404(FollowUp, id=followup_id)
@ -236,9 +220,7 @@ def followup_edit(request, ticket_id, followup_id):
return HttpResponseRedirect(reverse('helpdesk:view', args=[ticket.id])) return HttpResponseRedirect(reverse('helpdesk:view', args=[ticket.id]))
followup_edit = staff_member_required(followup_edit) @staff_member_required
def followup_delete(request, ticket_id, followup_id): def followup_delete(request, ticket_id, followup_id):
"""followup delete for superuser""" """followup delete for superuser"""
@ -251,9 +233,7 @@ def followup_delete(request, ticket_id, followup_id):
return HttpResponseRedirect(reverse('helpdesk:view', args=[ticket.id])) return HttpResponseRedirect(reverse('helpdesk:view', args=[ticket.id]))
followup_delete = staff_member_required(followup_delete) @staff_member_required
def view_ticket(request, ticket_id): def view_ticket(request, ticket_id):
ticket = get_object_or_404(Ticket, id=ticket_id) ticket = get_object_or_404(Ticket, id=ticket_id)
if not _has_access_to_queue(request.user, ticket.queue): if not _has_access_to_queue(request.user, ticket.queue):
@ -323,9 +303,6 @@ def view_ticket(request, ticket_id):
}) })
view_ticket = staff_member_required(view_ticket)
def return_ticketccstring_and_show_subscribe(user, ticket): def return_ticketccstring_and_show_subscribe(user, ticket):
"""used in view_ticket() and followup_edit()""" """used in view_ticket() and followup_edit()"""
# create the ticketcc_string and check whether current user is already # create the ticketcc_string and check whether current user is already
@ -666,6 +643,7 @@ def return_to_ticket(user, helpdesk_settings, ticket):
return HttpResponseRedirect(ticket.ticket_url) return HttpResponseRedirect(ticket.ticket_url)
@staff_member_required
def mass_update(request): def mass_update(request):
tickets = request.POST.getlist('ticket_id') tickets = request.POST.getlist('ticket_id')
action = request.POST.get('action', None) action = request.POST.get('action', None)
@ -781,9 +759,7 @@ def mass_update(request):
return HttpResponseRedirect(reverse('helpdesk:list')) return HttpResponseRedirect(reverse('helpdesk:list'))
mass_update = staff_member_required(mass_update) @staff_member_required
def ticket_list(request): def ticket_list(request):
context = {} context = {}
@ -970,9 +946,7 @@ def ticket_list(request):
)) ))
ticket_list = staff_member_required(ticket_list) @staff_member_required
def edit_ticket(request, ticket_id): def edit_ticket(request, ticket_id):
ticket = get_object_or_404(Ticket, id=ticket_id) ticket = get_object_or_404(Ticket, id=ticket_id)
if not _has_access_to_queue(request.user, ticket.queue): if not _has_access_to_queue(request.user, ticket.queue):
@ -991,9 +965,7 @@ def edit_ticket(request, ticket_id):
return render(request, 'helpdesk/edit_ticket.html', {'form': form}) return render(request, 'helpdesk/edit_ticket.html', {'form': form})
edit_ticket = staff_member_required(edit_ticket) @staff_member_required
def create_ticket(request): def create_ticket(request):
if helpdesk_settings.HELPDESK_STAFF_ONLY_TICKET_OWNERS: if helpdesk_settings.HELPDESK_STAFF_ONLY_TICKET_OWNERS:
assignable_users = User.objects.filter(is_active=True, is_staff=True).order_by(User.USERNAME_FIELD) assignable_users = User.objects.filter(is_active=True, is_staff=True).order_by(User.USERNAME_FIELD)
@ -1030,9 +1002,7 @@ def create_ticket(request):
return render(request, 'helpdesk/create_ticket.html', {'form': form}) return render(request, 'helpdesk/create_ticket.html', {'form': form})
create_ticket = staff_member_required(create_ticket) @staff_member_required
def raw_details(request, type): def raw_details(request, type):
# TODO: This currently only supports spewing out 'PreSetReply' objects, # TODO: This currently only supports spewing out 'PreSetReply' objects,
# in the future it needs to be expanded to include other items. All it # in the future it needs to be expanded to include other items. All it
@ -1051,9 +1021,7 @@ def raw_details(request, type):
raise Http404 raise Http404
raw_details = staff_member_required(raw_details) @staff_member_required
def hold_ticket(request, ticket_id, unhold=False): def hold_ticket(request, ticket_id, unhold=False):
ticket = get_object_or_404(Ticket, id=ticket_id) ticket = get_object_or_404(Ticket, id=ticket_id)
if not _has_access_to_queue(request.user, ticket.queue): if not _has_access_to_queue(request.user, ticket.queue):
@ -1082,23 +1050,17 @@ def hold_ticket(request, ticket_id, unhold=False):
return HttpResponseRedirect(ticket.get_absolute_url()) return HttpResponseRedirect(ticket.get_absolute_url())
hold_ticket = staff_member_required(hold_ticket) @staff_member_required
def unhold_ticket(request, ticket_id): def unhold_ticket(request, ticket_id):
return hold_ticket(request, ticket_id, unhold=True) return hold_ticket(request, ticket_id, unhold=True)
unhold_ticket = staff_member_required(unhold_ticket) @staff_member_required
def rss_list(request): def rss_list(request):
return render(request, 'helpdesk/rss_list.html', {'queues': Queue.objects.all()}) return render(request, 'helpdesk/rss_list.html', {'queues': Queue.objects.all()})
rss_list = staff_member_required(rss_list) @staff_member_required
def report_index(request): def report_index(request):
number_tickets = Ticket.objects.all().count() number_tickets = Ticket.objects.all().count()
saved_query = request.GET.get('saved_query', None) saved_query = request.GET.get('saved_query', None)
@ -1133,9 +1095,7 @@ def report_index(request):
}) })
report_index = staff_member_required(report_index) @staff_member_required
def run_report(request, report): def run_report(request, report):
if Ticket.objects.all().count() == 0 or report not in ( if Ticket.objects.all().count() == 0 or report not in (
'queuemonth', 'usermonth', 'queuestatus', 'queuepriority', 'userstatus', 'queuemonth', 'usermonth', 'queuestatus', 'queuepriority', 'userstatus',
@ -1339,9 +1299,7 @@ def run_report(request, report):
}) })
run_report = staff_member_required(run_report) @staff_member_required
def save_query(request): def save_query(request):
title = request.POST.get('title', None) title = request.POST.get('title', None)
shared = request.POST.get('shared', False) shared = request.POST.get('shared', False)
@ -1358,9 +1316,7 @@ def save_query(request):
return HttpResponseRedirect('%s?saved_query=%s' % (reverse('helpdesk:list'), query.id)) return HttpResponseRedirect('%s?saved_query=%s' % (reverse('helpdesk:list'), query.id))
save_query = staff_member_required(save_query) @staff_member_required
def delete_saved_query(request, id): def delete_saved_query(request, id):
query = get_object_or_404(SavedSearch, id=id, user=request.user) query = get_object_or_404(SavedSearch, id=id, user=request.user)
@ -1371,9 +1327,7 @@ def delete_saved_query(request, id):
return render(request, 'helpdesk/confirm_delete_saved_query.html', {'query': query}) return render(request, 'helpdesk/confirm_delete_saved_query.html', {'query': query})
delete_saved_query = staff_member_required(delete_saved_query) @staff_member_required
def user_settings(request): def user_settings(request):
s = request.user.usersettings_helpdesk s = request.user.usersettings_helpdesk
if request.POST: if request.POST:
@ -1387,18 +1341,14 @@ def user_settings(request):
return render(request, 'helpdesk/user_settings.html', {'form': form}) return render(request, 'helpdesk/user_settings.html', {'form': form})
user_settings = staff_member_required(user_settings) @superuser_required
def email_ignore(request): def email_ignore(request):
return render(request, 'helpdesk/email_ignore_list.html', { return render(request, 'helpdesk/email_ignore_list.html', {
'ignore_list': IgnoreEmail.objects.all(), 'ignore_list': IgnoreEmail.objects.all(),
}) })
email_ignore = superuser_required(email_ignore) @superuser_required
def email_ignore_add(request): def email_ignore_add(request):
if request.method == 'POST': if request.method == 'POST':
form = EmailIgnoreForm(request.POST) form = EmailIgnoreForm(request.POST)
@ -1411,9 +1361,7 @@ def email_ignore_add(request):
return render(request, 'helpdesk/email_ignore_add.html', {'form': form}) return render(request, 'helpdesk/email_ignore_add.html', {'form': form})
email_ignore_add = superuser_required(email_ignore_add) @superuser_required
def email_ignore_del(request, id): def email_ignore_del(request, id):
ignore = get_object_or_404(IgnoreEmail, id=id) ignore = get_object_or_404(IgnoreEmail, id=id)
if request.method == 'POST': if request.method == 'POST':
@ -1423,9 +1371,7 @@ def email_ignore_del(request, id):
return render(request, 'helpdesk/email_ignore_del.html', {'ignore': ignore}) return render(request, 'helpdesk/email_ignore_del.html', {'ignore': ignore})
email_ignore_del = superuser_required(email_ignore_del) @staff_member_required
def ticket_cc(request, ticket_id): def ticket_cc(request, ticket_id):
ticket = get_object_or_404(Ticket, id=ticket_id) ticket = get_object_or_404(Ticket, id=ticket_id)
if not _has_access_to_queue(request.user, ticket.queue): if not _has_access_to_queue(request.user, ticket.queue):
@ -1440,9 +1386,7 @@ def ticket_cc(request, ticket_id):
}) })
ticket_cc = staff_member_required(ticket_cc) @staff_member_required
def ticket_cc_add(request, ticket_id): def ticket_cc_add(request, ticket_id):
ticket = get_object_or_404(Ticket, id=ticket_id) ticket = get_object_or_404(Ticket, id=ticket_id)
if not _has_access_to_queue(request.user, ticket.queue): if not _has_access_to_queue(request.user, ticket.queue):
@ -1468,9 +1412,7 @@ def ticket_cc_add(request, ticket_id):
}) })
ticket_cc_add = staff_member_required(ticket_cc_add) @staff_member_required
def ticket_cc_del(request, ticket_id, cc_id): def ticket_cc_del(request, ticket_id, cc_id):
cc = get_object_or_404(TicketCC, ticket__id=ticket_id, id=cc_id) cc = get_object_or_404(TicketCC, ticket__id=ticket_id, id=cc_id)
@ -1481,9 +1423,7 @@ def ticket_cc_del(request, ticket_id, cc_id):
return render(request, 'helpdesk/ticket_cc_del.html', {'cc': cc}) return render(request, 'helpdesk/ticket_cc_del.html', {'cc': cc})
ticket_cc_del = staff_member_required(ticket_cc_del) @staff_member_required
def ticket_dependency_add(request, ticket_id): def ticket_dependency_add(request, ticket_id):
ticket = get_object_or_404(Ticket, id=ticket_id) ticket = get_object_or_404(Ticket, id=ticket_id)
if not _has_access_to_queue(request.user, ticket.queue): if not _has_access_to_queue(request.user, ticket.queue):
@ -1506,9 +1446,7 @@ def ticket_dependency_add(request, ticket_id):
}) })
ticket_dependency_add = staff_member_required(ticket_dependency_add) @staff_member_required
def ticket_dependency_del(request, ticket_id, dependency_id): def ticket_dependency_del(request, ticket_id, dependency_id):
dependency = get_object_or_404(TicketDependency, ticket__id=ticket_id, id=dependency_id) dependency = get_object_or_404(TicketDependency, ticket__id=ticket_id, id=dependency_id)
if request.method == 'POST': if request.method == 'POST':
@ -1517,9 +1455,7 @@ def ticket_dependency_del(request, ticket_id, dependency_id):
return render(request, 'helpdesk/ticket_dependency_del.html', {'dependency': dependency}) return render(request, 'helpdesk/ticket_dependency_del.html', {'dependency': dependency})
ticket_dependency_del = staff_member_required(ticket_dependency_del) @staff_member_required
def attachment_del(request, ticket_id, attachment_id): def attachment_del(request, ticket_id, attachment_id):
ticket = get_object_or_404(Ticket, id=ticket_id) ticket = get_object_or_404(Ticket, id=ticket_id)
if not _has_access_to_queue(request.user, ticket.queue): if not _has_access_to_queue(request.user, ticket.queue):
@ -1537,9 +1473,6 @@ def attachment_del(request, ticket_id, attachment_id):
}) })
attachment_del = staff_member_required(attachment_del)
def calc_average_nbr_days_until_ticket_resolved(Tickets): def calc_average_nbr_days_until_ticket_resolved(Tickets):
nbr_closed_tickets = len(Tickets) nbr_closed_tickets = len(Tickets)
days_per_ticket = 0 days_per_ticket = 0