Update get_email test to use unicode characters; seem to have found an encoding errory in get_email, provided a fix that satisfies test

This commit is contained in:
Garret Wassermann 2017-04-03 01:53:25 -04:00
parent b5fa489643
commit 74a7afadb9
2 changed files with 28 additions and 39 deletions

View File

@ -159,7 +159,7 @@ def process_queue(q, logger):
msgNum = msg.split(" ")[0]
logger.info("Processing message %s" % msgNum)
full_message = encoding.force_text("\n".join(server.retr(msgNum)[1]), errors='ignore')
full_message = encoding.force_text("\n".join(server.retr(msgNum)[1]), errors='replace')
ticket = ticket_from_message(message=full_message, queue=q, logger=logger)
if ticket:
@ -199,7 +199,7 @@ def process_queue(q, logger):
for num in msgnums:
logger.info("Processing message %s" % num)
status, data = server.fetch(num, '(RFC822)')
full_message = encoding.force_text(data[0][1], errors='ignore')
full_message = encoding.force_text(data[0][1], errors='replace')
ticket = ticket_from_message(message=full_message, queue=q, logger=logger)
if ticket:
server.store(num, '+FLAGS', '\\Deleted')
@ -220,7 +220,7 @@ def process_queue(q, logger):
for i, m in enumerate(mail, 1):
logger.info("Processing message %d" % i)
with open(m, 'r') as f:
full_message = encoding.force_text(f.read(), errors='ignore')
full_message = encoding.force_text(f.read(), errors='replace')
ticket = ticket_from_message(message=full_message, queue=q, logger=logger)
if ticket:
logger.info("Successfully processed message %d, ticket/comment created." % i)
@ -238,18 +238,18 @@ def decodeUnknown(charset, string):
if six.PY2:
if not charset:
try:
return string.decode('utf-8', 'ignore')
return string.decode('utf-8', 'replace')
except:
return string.decode('iso8859-1', 'ignore')
return string.decode('iso8859-1', 'replace')
return unicode(string, charset)
elif six.PY3:
if type(string) is not str:
if not charset:
try:
return str(string, encoding='utf-8', errors='ignore')
return str(string, encoding='utf-8', errors='replace')
except:
return str(string, encoding='iso8859-1', errors='ignore')
return str(string, encoding=charset, errors='ignore')
return str(string, encoding='iso8859-1', errors='replace')
return str(string, encoding=charset, errors='replace')
return string
@ -258,7 +258,7 @@ def decode_mail_headers(string):
if six.PY2:
return u' '.join([unicode(msg, charset or 'utf-8') for msg, charset in decoded])
elif six.PY3:
return u' '.join([str(msg, encoding=charset, errors='ignore') if charset else str(msg) for msg, charset in decoded])
return u' '.join([str(msg, encoding=charset, errors='replace') if charset else str(msg) for msg, charset in decoded])
def ticket_from_message(message, queue, logger):
@ -305,9 +305,11 @@ def ticket_from_message(message, queue, logger):
if part.get_content_maintype() == 'text' and name is None:
if part.get_content_subtype() == 'plain':
body = encoding.force_text(EmailReplyParser.parse_reply(
decodeUnknown(part.get_content_charset(), encoding.force_text(part.get_payload(decode=True)))
))
body = EmailReplyParser.parse_reply(
decodeUnknown(part.get_content_charset(), part.get_payload(decode=True))
)
# workaround to get unicode text out rather than escaped text
body = body.encode('ascii').decode('unicode_escape')
logger.debug("Discovered plain text MIME part")
else:
files.append(

View File

@ -74,15 +74,17 @@ class GetEmailParametricTemplate(object):
rmtree(self.temp_logdir)
def test_read_email(self):
"""Tests reading emails from a queue and creating tickets.
def test_read_plain_email(self):
"""Tests reading plain text emails from a queue and creating tickets.
For each email source supported, we mock the backend to provide
authentically formatted responses containing our test data."""
test_email = "To: update.public@example.com\nFrom: comment@example.com\nSubject: Some Comment\n\nThis is the helpdesk comment via email."
test_mail_len = len(test_email)
test_unicode_email = "To: update.public@example.com\nFrom: comment@example.com\nSubject: Some Unicode Comment\n\nThis is the helpdesk comment via email with unicode chars \u2013 inserted for testing purposes."
test_unicode_email_len = len(test_unicode_email)
# example email text from Django docs: https://docs.djangoproject.com/en/1.10/ref/unicode/
test_email_from = "Arnbjörg Ráðormsdóttir <arnbjorg@example.com>"
test_email_subject = "My visit to Sør-Trøndelag"
test_email_body = "Unicode helpdesk comment with an s-hat (ŝ) via email."
test_email = "To: helpdesk@example.com\nFrom: " + test_email_from + "\nSubject: " + test_email_subject + "\n\n" + test_email_body
test_mail_len = len(test_email)
if self.socks:
from socks import ProxyConnectionError
@ -104,25 +106,13 @@ class GetEmailParametricTemplate(object):
mocked_isfile.assert_any_call('/var/lib/mail/helpdesk/filename1')
mocked_isfile.assert_any_call('/var/lib/mail/helpdesk/filename2')
with mock.patch('helpdesk.management.commands.get_email.listdir') as mocked_listdir, \
mock.patch('helpdesk.management.commands.get_email.isfile') as mocked_isfile, \
mock.patch('builtins.open' if six.PY3 else '__builtin__.open', mock.mock_open(read_data=test_unicode_email)):
mocked_isfile.return_value = True
mocked_listdir.return_value = ['filename3']
call_command('get_email')
mocked_listdir.assert_called_with('/var/lib/mail/helpdesk/')
mocked_isfile.assert_any_call('/var/lib/mail/helpdesk/filename3')
elif self.method == 'pop3':
# mock poplib.POP3's list and retr methods to provide responses as per RFC 1939
pop3_emails = {
'1': ("+OK", test_email.split('\n')),
'2': ("+OK", test_email.split('\n')),
'3': ("+OK", test_unicode_email.split('\n')),
}
pop3_mail_list = ("+OK 3 messages", ("1 %d" % test_mail_len, "2 %d" % test_mail_len, "3 %d" % test_unicode_email_len))
pop3_mail_list = ("+OK 2 messages", ("1 %d" % test_mail_len, "2 %d" % test_mail_len))
mocked_poplib_server = mock.Mock()
mocked_poplib_server.list = mock.Mock(return_value=pop3_mail_list)
mocked_poplib_server.retr = mock.Mock(side_effect=lambda x: pop3_emails[x])
@ -135,9 +125,8 @@ class GetEmailParametricTemplate(object):
imap_emails = {
"1": ("OK", (("1", test_email),)),
"2": ("OK", (("2", test_email),)),
"3": ("OK", (("3", test_unicode_email),)),
}
imap_mail_list = ("OK", ("1 2 3",))
imap_mail_list = ("OK", ("1 2",))
mocked_imaplib_server = mock.Mock()
mocked_imaplib_server.search = mock.Mock(return_value=imap_mail_list)
@ -149,15 +138,13 @@ class GetEmailParametricTemplate(object):
ticket1 = get_object_or_404(Ticket, pk=1)
self.assertEqual(ticket1.ticket_for_url, "QQ-%s" % ticket1.id)
self.assertEqual(ticket1.description, "This is the helpdesk comment via email.")
self.assertEqual(ticket1.title, test_email_subject)
self.assertEqual(ticket1.description, test_email_body)
ticket2 = get_object_or_404(Ticket, pk=2)
self.assertEqual(ticket2.ticket_for_url, "QQ-%s" % ticket2.id)
self.assertEqual(ticket2.description, "This is the helpdesk comment via email.")
ticket3 = get_object_or_404(Ticket, pk=3)
self.assertEqual(ticket3.ticket_for_url, "QQ-%s" % ticket3.id)
self.assertEqual(ticket3.description, "This is the helpdesk comment via email with unicode chars \u2013 inserted for testing purposes.")
self.assertEqual(ticket2.title, test_email_subject)
self.assertEqual(ticket2.description, test_email_body)
# build matrix of test cases