Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys
- import email
- from django.contrib.auth.models import User
- from tddspry.django import DatabaseTestCase
- from cryemail.mail.utils.folders import retrieve_folders_for_user, \
- prepare_folder_name
- from cryemail.mail.utils.messages import retrieve_messages_for_user
- from cryemail.mail.utils import values_by_headers
- from cryemail.mail.utils import imap_utf7
- from cryemail.mail.models import Folder, Message
- import re
- ADMIN_LOGIN = 'admin'
- ADMIN_PASSWORD = 'admin'
- TEST_GMAIL_HOST = "imap.gmail.com"
- TEST_GMAIL_PORT = 993
- TEST_GMAIL_ACCOUNT = 'testingcrye@gmail.com'
- TEST_GMAIL_PASSWORD = 'JetjlaWEIt342'
- VERBOSE_LOG = lambda msg: sys.stderr.write("\n\tLOG: %s\n" % msg)
- class TestUtils(DatabaseTestCase):
- def setup(self):
- if hasattr(self, 'mailbox'):
- self.mailbox.close()
- self.mailbox.logout()
- self.user = User.objects.get(username=ADMIN_LOGIN)
- self.user.imap.username = TEST_GMAIL_ACCOUNT
- self.user.imap.password = TEST_GMAIL_PASSWORD
- self.user.imap.host = TEST_GMAIL_HOST
- self.user.imap.port = TEST_GMAIL_PORT
- self.user.imap.save()
- self.mailbox = self.user.imap.get_mailbox()
- def test_folders_sync(self):
- TEST_FOLDER = 'TestFolder'
- try:
- self.mailbox.delete(TEST_FOLDER)
- except:
- VERBOSE_LOG("Can't delete folder remotely: %s" % \
- str(sys.exc_info()))
- retrieve_folders_for_user(self.user)
- folders = [data.split()[-1].strip('"') \
- for data in self.mailbox.list()[1]]
- for folder in folders:
- self.assert_read(Folder, user=self.user,
- name=imap_utf7.decode(folder))
- fldrs_count = self.user.folders.count()
- # test deletion remote folders
- status, response = self.mailbox.create(TEST_FOLDER)
- retrieve_folders_for_user(self.user)
- self.assert_equals(fldrs_count+1, \
- Folder.objects.filter(user=self.user).count())
- self.mailbox.delete(TEST_FOLDER)
- retrieve_folders_for_user(self.user)
- VERBOSE_LOG(u", ".join([f.name for f in self.user.folders.all()]))
- self.assert_equals(Folder.objects.filter(user=self.user).count(), \
- fldrs_count+1)
- # re-create folder
- status, response = self.mailbox.create(TEST_FOLDER)
- retrieve_folders_for_user(self.user)
- self.assert_false(Folder.objects.filter(user=self.user, \
- name__exact=TEST_FOLDER)[0].is_archived)
- self.mailbox.delete(TEST_FOLDER)
- def __get_mail_attachments(self, message=None):
- attachments = []
- # walk through email body
- for part in message.walk():
- # read message attachments
- if part.get_filename(None):
- filename = part.get_filename(None)
- if filename:
- filedata = part.get_payload(decode=True)
- if filedata:
- # get mime type and save to database if all is ok
- mime_type = part.get_content_type()
- attachments.append({
- 'mimetype': mime_type,
- 'name': filename,
- 'size': len(filedata),
- })
- del filedata
- return attachments
- def test_messages_sync(self):
- # sync messages
- retrieve_messages_for_user(self.user)
- folders = [data.split()[-1].strip('"') \
- for data in self.mailbox.list()[1]]
- for folder in folders:
- # go to folder and retrieve messages
- status, resp = self.mailbox.select(prepare_folder_name(folder))
- if not status == 'OK':
- continue
- status, available_msg_ids = self.mailbox.search(None, 'ALL')
- # fetch available mail and add it to database
- for msg_id in available_msg_ids[0].split():
- IMAP_COMMAND = '(UID RFC822)'
- status, response = self.mailbox.fetch(msg_id, IMAP_COMMAND)
- print response[0][0]
- matches = re.search("\d+? \(UID (\d+)? RFC822", response[0][0])
- self.assert_not_equal(matches, None)
- uid = matches.group(1)
- try:
- rfc822_message = response[0][1]
- except IndexError:
- continue
- message = email.message_from_string(rfc822_message)
- attachments = self.__get_mail_attachments(message=message)
- headers = values_by_headers(
- message,
- ['From', 'To', 'Subject', 'Message-Id', 'Cc', 'Bcc'])
- message_id = headers['Message-Id']
- subject = headers['Subject']
- from_addr = headers['From']
- to_addr = headers['To']
- cc_addr = headers['Cc']
- bcc_addr = headers['Bcc']
- folder_name = imap_utf7.decode(folder)
- msg_obj = Message.objects.get(msg_id=uid)
- self.assert_equals(msg_obj.msg_id, uid)
- self.assert_equals(msg_obj.subject.encode('utf-8'),
- email.header.decode_header(
- subject.decode('utf-8'))[0][0])
- self.assert_equals(msg_obj.to_addr, to_addr)
- self.assert_equals(msg_obj.from_addr, from_addr)
- self.assert_equals(msg_obj.bcc_addr, bcc_addr)
- self.assert_equals(msg_obj.cc_addr, cc_addr)
- self.assert_equals(msg_obj.folder.name, folder_name)
- # test attachments
- self.assert_equals(msg_obj.attachments.all().count(), \
- len(attachments))
- for att_details in attachments:
- name = att_details['name']
- mimetype = att_details['mimetype']
- size = att_details['size']
- VERBOSE_LOG("Attachment: %s (%s) - %s bytes" % (
- name, mimetype, size))
- # check size, mimetype and name of every item in
- # attachments in model
- self.assert_equals(msg_obj.attachments.filter(\
- name=name, mimetype=mimetype, size=size).count(), 1)
Add Comment
Please, Sign In to add comment