Guest User

Untitled

a guest
Aug 11th, 2018
558
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.55 KB | None | 0 0
  1. import sys
  2. import email
  3. from django.contrib.auth.models import User
  4. from tddspry.django import DatabaseTestCase
  5. from cryemail.mail.utils.folders import retrieve_folders_for_user, \
  6. prepare_folder_name
  7. from cryemail.mail.utils.messages import retrieve_messages_for_user
  8. from cryemail.mail.utils import values_by_headers
  9. from cryemail.mail.utils import imap_utf7
  10. from cryemail.mail.models import Folder, Message
  11.  
  12. import re
  13.  
  14. ADMIN_LOGIN = 'admin'
  15. ADMIN_PASSWORD = 'admin'
  16.  
  17. TEST_GMAIL_HOST = "imap.gmail.com"
  18. TEST_GMAIL_PORT = 993
  19. TEST_GMAIL_ACCOUNT = 'testingcrye@gmail.com'
  20. TEST_GMAIL_PASSWORD = 'JetjlaWEIt342'
  21.  
  22. VERBOSE_LOG = lambda msg: sys.stderr.write("\n\tLOG: %s\n" % msg)
  23.  
  24.  
  25. class TestUtils(DatabaseTestCase):
  26. def setup(self):
  27. if hasattr(self, 'mailbox'):
  28. self.mailbox.close()
  29. self.mailbox.logout()
  30. self.user = User.objects.get(username=ADMIN_LOGIN)
  31. self.user.imap.username = TEST_GMAIL_ACCOUNT
  32. self.user.imap.password = TEST_GMAIL_PASSWORD
  33. self.user.imap.host = TEST_GMAIL_HOST
  34. self.user.imap.port = TEST_GMAIL_PORT
  35. self.user.imap.save()
  36. self.mailbox = self.user.imap.get_mailbox()
  37.  
  38.  
  39. def test_folders_sync(self):
  40. TEST_FOLDER = 'TestFolder'
  41. try:
  42. self.mailbox.delete(TEST_FOLDER)
  43. except:
  44. VERBOSE_LOG("Can't delete folder remotely: %s" % \
  45. str(sys.exc_info()))
  46.  
  47. retrieve_folders_for_user(self.user)
  48.  
  49. folders = [data.split()[-1].strip('"') \
  50. for data in self.mailbox.list()[1]]
  51.  
  52. for folder in folders:
  53. self.assert_read(Folder, user=self.user,
  54. name=imap_utf7.decode(folder))
  55. fldrs_count = self.user.folders.count()
  56.  
  57. # test deletion remote folders
  58. status, response = self.mailbox.create(TEST_FOLDER)
  59.  
  60. retrieve_folders_for_user(self.user)
  61. self.assert_equals(fldrs_count+1, \
  62. Folder.objects.filter(user=self.user).count())
  63. self.mailbox.delete(TEST_FOLDER)
  64.  
  65. retrieve_folders_for_user(self.user)
  66. VERBOSE_LOG(u", ".join([f.name for f in self.user.folders.all()]))
  67. self.assert_equals(Folder.objects.filter(user=self.user).count(), \
  68. fldrs_count+1)
  69. # re-create folder
  70. status, response = self.mailbox.create(TEST_FOLDER)
  71. retrieve_folders_for_user(self.user)
  72. self.assert_false(Folder.objects.filter(user=self.user, \
  73. name__exact=TEST_FOLDER)[0].is_archived)
  74.  
  75. self.mailbox.delete(TEST_FOLDER)
  76.  
  77. def __get_mail_attachments(self, message=None):
  78. attachments = []
  79.  
  80. # walk through email body
  81. for part in message.walk():
  82. # read message attachments
  83. if part.get_filename(None):
  84. filename = part.get_filename(None)
  85. if filename:
  86. filedata = part.get_payload(decode=True)
  87. if filedata:
  88. # get mime type and save to database if all is ok
  89. mime_type = part.get_content_type()
  90. attachments.append({
  91. 'mimetype': mime_type,
  92. 'name': filename,
  93. 'size': len(filedata),
  94. })
  95.  
  96. del filedata
  97.  
  98. return attachments
  99.  
  100. def test_messages_sync(self):
  101. # sync messages
  102. retrieve_messages_for_user(self.user)
  103.  
  104. folders = [data.split()[-1].strip('"') \
  105. for data in self.mailbox.list()[1]]
  106.  
  107. for folder in folders:
  108. # go to folder and retrieve messages
  109. status, resp = self.mailbox.select(prepare_folder_name(folder))
  110.  
  111. if not status == 'OK':
  112. continue
  113.  
  114. status, available_msg_ids = self.mailbox.search(None, 'ALL')
  115.  
  116. # fetch available mail and add it to database
  117. for msg_id in available_msg_ids[0].split():
  118. IMAP_COMMAND = '(UID RFC822)'
  119. status, response = self.mailbox.fetch(msg_id, IMAP_COMMAND)
  120.  
  121. print response[0][0]
  122.  
  123. matches = re.search("\d+? \(UID (\d+)? RFC822", response[0][0])
  124.  
  125. self.assert_not_equal(matches, None)
  126.  
  127. uid = matches.group(1)
  128.  
  129. try:
  130. rfc822_message = response[0][1]
  131. except IndexError:
  132. continue
  133.  
  134. message = email.message_from_string(rfc822_message)
  135. attachments = self.__get_mail_attachments(message=message)
  136. headers = values_by_headers(
  137. message,
  138. ['From', 'To', 'Subject', 'Message-Id', 'Cc', 'Bcc'])
  139.  
  140. message_id = headers['Message-Id']
  141. subject = headers['Subject']
  142. from_addr = headers['From']
  143. to_addr = headers['To']
  144. cc_addr = headers['Cc']
  145. bcc_addr = headers['Bcc']
  146. folder_name = imap_utf7.decode(folder)
  147.  
  148. msg_obj = Message.objects.get(msg_id=uid)
  149. self.assert_equals(msg_obj.msg_id, uid)
  150. self.assert_equals(msg_obj.subject.encode('utf-8'),
  151. email.header.decode_header(
  152. subject.decode('utf-8'))[0][0])
  153. self.assert_equals(msg_obj.to_addr, to_addr)
  154. self.assert_equals(msg_obj.from_addr, from_addr)
  155. self.assert_equals(msg_obj.bcc_addr, bcc_addr)
  156. self.assert_equals(msg_obj.cc_addr, cc_addr)
  157. self.assert_equals(msg_obj.folder.name, folder_name)
  158.  
  159. # test attachments
  160. self.assert_equals(msg_obj.attachments.all().count(), \
  161. len(attachments))
  162.  
  163. for att_details in attachments:
  164. name = att_details['name']
  165. mimetype = att_details['mimetype']
  166. size = att_details['size']
  167.  
  168. VERBOSE_LOG("Attachment: %s (%s) - %s bytes" % (
  169. name, mimetype, size))
  170.  
  171. # check size, mimetype and name of every item in
  172. # attachments in model
  173. self.assert_equals(msg_obj.attachments.filter(\
  174. name=name, mimetype=mimetype, size=size).count(), 1)
Add Comment
Please, Sign In to add comment