Fix format errors

This commit is contained in:
Christopher Broderick 2023-07-23 06:31:29 +01:00
parent f1e1d52cd2
commit 9bbe1945b0

View File

@ -6,8 +6,6 @@ See LICENSE for details.
""" """
# import base64 # import base64
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from datetime import timedelta from datetime import timedelta
from django.conf import settings as django_settings from django.conf import settings as django_settings
@ -19,6 +17,7 @@ from django.utils import encoding, timezone
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
import email import email
from email.message import Message from email.message import Message
from email.mime.text import MIMEText
from email.utils import getaddresses from email.utils import getaddresses
from email_reply_parser import EmailReplyParser from email_reply_parser import EmailReplyParser
from helpdesk import settings from helpdesk import settings
@ -40,7 +39,6 @@ import sys
from time import ctime from time import ctime
import typing import typing
from typing import List from typing import List
from email.mime.text import MIMEText
# import User model, which may be a custom model # import User model, which may be a custom model
@ -78,7 +76,6 @@ def process_email(quiet=False):
logger.propagate = False logger.propagate = False
if quiet: if quiet:
logger.propagate = False # do not propagate to root logger that would log to console logger.propagate = False # do not propagate to root logger that would log to console
# Log messages to specific file only if the queue has it configured # Log messages to specific file only if the queue has it configured
if (q.logging_type in logging_types) and q.logging_dir: # if it's enabled and the dir is set if (q.logging_type in logging_types) and q.logging_dir: # if it's enabled and the dir is set
log_file_handler = logging.FileHandler( log_file_handler = logging.FileHandler(
@ -255,13 +252,11 @@ def imap_oauth_sync(q, logger, server):
) )
server.debug = settings.HELPDESK_IMAP_DEBUG_LEVEL server.debug = settings.HELPDESK_IMAP_DEBUG_LEVEL
# TODO: Perhaps store the authentication string template externally? Settings? Queue Table? # TODO: Perhaps store the authentication string template externally? Settings? Queue Table?
server.authenticate( server.authenticate(
"XOAUTH2", "XOAUTH2",
lambda x: f"user={q.email_box_user}\x01auth=Bearer {token['access_token']}\x01\x01".encode(), lambda x: f"user={q.email_box_user}\x01auth=Bearer {token['access_token']}\x01\x01".encode(),
) )
# Select the Inbound Mailbox folder # Select the Inbound Mailbox folder
server.select(q.email_box_imap_folder) server.select(q.email_box_imap_folder)
@ -315,7 +310,6 @@ def imap_oauth_sync(q, logger, server):
"IMAP retrieve failed. Is the folder '%s' spelled correctly, and does it exist on the server?", "IMAP retrieve failed. Is the folder '%s' spelled correctly, and does it exist on the server?",
q.email_box_imap_folder q.email_box_imap_folder
) )
# Purged Flagged Messages & Logout # Purged Flagged Messages & Logout
server.expunge() server.expunge()
server.close() server.close()
@ -436,7 +430,7 @@ def decodeUnknown(charset, string):
if not charset: if not charset:
try: try:
return str(string, encoding='utf-8', errors='replace') return str(string, encoding='utf-8', errors='replace')
except UnicodeError as e: except UnicodeError:
return str(string, encoding='iso8859-1', errors='replace') return str(string, encoding='iso8859-1', errors='replace')
return str(string, encoding=charset, errors='replace') return str(string, encoding=charset, errors='replace')
return string return string
@ -471,11 +465,10 @@ def is_autoreply(message):
def create_ticket_cc(ticket, cc_list): def create_ticket_cc(ticket, cc_list):
if not cc_list: if not cc_list:
return [] return []
# Local import to deal with non-defined / circular reference problem # Local import to deal with non-defined / circular reference problem
from helpdesk.views.staff import subscribe_to_ticket_updates, User
new_ticket_ccs = [] new_ticket_ccs = []
from helpdesk.views.staff import subscribe_to_ticket_updates, User
for __, cced_email in cc_list: for __, cced_email in cc_list:
cced_email = cced_email.strip() cced_email = cced_email.strip()
@ -541,7 +534,6 @@ def create_object_from_email_message(message, ticket_id, payload, files, logger)
ticket.merged_to.ticket) ticket.merged_to.ticket)
# Use the ticket in which it was merged to for next operations # Use the ticket in which it was merged to for next operations
ticket = ticket.merged_to ticket = ticket.merged_to
# New issue, create a new <Ticket> instance # New issue, create a new <Ticket> instance
if ticket is None: if ticket is None:
if not settings.QUEUE_EMAIL_BOX_UPDATE_ONLY: if not settings.QUEUE_EMAIL_BOX_UPDATE_ONLY:
@ -558,7 +550,6 @@ def create_object_from_email_message(message, ticket_id, payload, files, logger)
(ticket.queue.slug, ticket.id)) (ticket.queue.slug, ticket.id))
new = True new = True
# Old issue being re-opened # Old issue being re-opened
elif ticket.status == Ticket.CLOSED_STATUS: elif ticket.status == Ticket.CLOSED_STATUS:
ticket.status = Ticket.REOPENED_STATUS ticket.status = Ticket.REOPENED_STATUS
@ -733,7 +724,8 @@ def extract_mime_content(part: Message,) -> str:
''' '''
content_bytes = part.get_payload(decode=True) content_bytes = part.get_payload(decode=True)
charset = part.get_content_charset() charset = part.get_content_charset()
# The default for MIME email is 7bit which requires special decoding to utf-8 so make sure we handle the decoding correctly # The default for MIME email is 7bit which requires special decoding to utf-8 so make sure
# we handle the decoding correctly
if part['Content-Transfer-Encoding'] in [None, '8bit', '7bit'] and (charset == 'utf-8' or charset is None): if part['Content-Transfer-Encoding'] in [None, '8bit', '7bit'] and (charset == 'utf-8' or charset is None):
charset = "unicode_escape" charset = "unicode_escape"
content = decodeUnknown(charset, content_bytes) content = decodeUnknown(charset, content_bytes)
@ -835,7 +827,6 @@ def extract_email_metadata(message: str,
counter = 0 counter = 0
files = [] files = []
first_mime_non_multipart_content: MIMEText = None first_mime_non_multipart_content: MIMEText = None
# Cycle through all MIME parts in the email extracting the plain and formatted messages # Cycle through all MIME parts in the email extracting the plain and formatted messages
# Algorithm uses the first text parts found as the actual email content and subsequent text parts # Algorithm uses the first text parts found as the actual email content and subsequent text parts
# are made into attachments so they do not get lost # are made into attachments so they do not get lost
@ -846,7 +837,8 @@ def extract_email_metadata(message: str,
if part.get_content_disposition() in ['inline', 'attachment']: if part.get_content_disposition() in ['inline', 'attachment']:
process_as_attachment(part, counter, files, logger) process_as_attachment(part, counter, files, logger)
else: else:
# Get the content then assign to plain for formatted email message otherwise store the content as an attachment # Get the content then assign to plain for formatted email message otherwise store the
# content as an attachment
mime_content = extract_mime_content(part) mime_content = extract_mime_content(part)
if first_mime_non_multipart_content is None: if first_mime_non_multipart_content is None:
first_mime_non_multipart_content = mime_content first_mime_non_multipart_content = mime_content
@ -875,14 +867,14 @@ def extract_email_metadata(message: str,
HTML_EMAIL_ATTACHMENT_FILENAME, payload.encode("utf-8"), 'text/html') HTML_EMAIL_ATTACHMENT_FILENAME, payload.encode("utf-8"), 'text/html')
) )
else: else:
# Theoretically should not happen to properly structured emails but process anything else as an attachment # Theoretically should not happen to properly structured emails but process anything
# else as an attachment
process_as_attachment(part, counter, files, logger) process_as_attachment(part, counter, files, logger)
logger.debug(f"Text MIME part added as attachment: {part.get_content_type()}") logger.debug(f"Text MIME part added as attachment: {part.get_content_type()}")
else: else:
# process anything else as an attachment # process anything else as an attachment
process_as_attachment(part, counter, files, logger) process_as_attachment(part, counter, files, logger)
counter += 1 counter += 1
# Check if we have at least the plain body # Check if we have at least the plain body
if not plain_body: if not plain_body:
if formatted_body: if formatted_body:
@ -895,12 +887,11 @@ def extract_email_metadata(message: str,
else: else:
plain_body = message plain_body = message
# first message in thread, we save full body to avoid losing forwards and things like that # first message in thread, we save full body to avoid losing forwards and things like that
include_chained_msgs = True if ticket_id is None and getattr(django_settings, 'HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL', False) else False include_chained_msgs = True if ticket_id is None and getattr(
django_settings, 'HELPDESK_FULL_FIRST_MESSAGE_FROM_EMAIL', False) else False
message_body = extract_email_message(plain_body, True, include_chained_msgs) message_body = extract_email_message(plain_body, True, include_chained_msgs)
# Only need the full message if the message_body excludes the chained messages # Only need the full message if the message_body excludes the chained messages
chained_email_message = None if include_chained_msgs else plain_body chained_email_message = None if include_chained_msgs else plain_body
# Not sure this is valid but a unit test uses a DIFFERENT plain text to html text body # Not sure this is valid but a unit test uses a DIFFERENT plain text to html text body
# where plain text has blank message with forwarded message so.... hack away to support it # where plain text has blank message with forwarded message so.... hack away to support it
if message_body is not None and len(message_body) == 0 and formatted_body and len(formatted_body) > 0: if message_body is not None and len(message_body) == 0 and formatted_body and len(formatted_body) > 0: