Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #########################################################################
- # -*- coding: utf-8 -*- #
- #""" #
- # poweremail account #
- # ~~~~~~~~ #
- #Poweremail is the email tool for openerp - "made with good intentions" #
- #:copyright: (c) 2009-2010 by Sharoon Thomas #
- #:copyright: (c) 2011 by Openlabs Technologies & Consulting (P) Limited.#
- #:license: GPL, see LICENSE for more details. #
- #""" #
- #########################################################################
- import string
- import re
- import smtplib
- import poplib
- import imaplib
- import base64
- import time
- import datetime
- from email import Encoders, message_from_string
- from email.mime.base import MIMEBase
- from email.mime.multipart import MIMEMultipart
- from email.mime.text import MIMEText
- from email.header import decode_header, Header
- from email.utils import formatdate
- from osv import osv, fields
- from tools.translate import _
- import netsvc
- import tools
- import helpers
- class Account(osv.osv):
- """
- Email Account Settings
- .. note:: The web client does not show a valid message when the
- 'Test Incoming Connection' is done
- .. warning:: The email usernames and passwords are stored as plain text in
- the database. Store them at your own risk.
- """
- _name = "poweremail.account"
- _description = "Email Accounts"
- _known_content_types = [
- 'multipart/mixed',
- 'multipart/alternative',
- 'multipart/related',
- 'text/plain',
- 'text/html'
- ]
- _columns = {
- 'name': fields.char('Name',size=64, required=True, readonly=True,
- select=True, states={'draft':[('readonly', False)]}),
- 'user': fields.many2one('res.users', 'Owner', required=True,
- readonly=True, states={'draft':[('readonly', False)]}),
- 'email_id': fields.char('Email ID', size=120, required=True,
- readonly=True, states={'draft':[('readonly', False)]} ,
- help="eg : yourname@yourdomain.com "),
- 'smtp_server': fields.char('Outgoing Mail Server', size=120,
- required=True,readonly=True, states={'draft':[('readonly',
- False)]}, help="eg : smtp.gmail.com "),
- 'smtp_port': fields.integer('SMTP Port', size = 64, required = True,
- readonly=True, states={'draft':[('readonly', False)]},
- help="eg : SMTP-587 "),
- 'smtp_username': fields.char('User Name', size=120, required=False,
- readonly=True, states={'draft':[('readonly', False)]}),
- 'smtp_password': fields.char( 'Password', size=20, invisible=True,
- required=False, readonly=True,states={
- 'draft':[('readonly', False)]}),
- 'smtp_use_tls': fields.boolean('Use TLS',
- states={'draft':[('readonly', False)]},readonly=True),
- 'smtp_use_ssl':fields.boolean('Use SSL/TLS (only in python 2.6)',
- states={'draft':[('readonly', False)]}, readonly=True),
- 'send_preference': fields.selection(
- [
- ('html', 'HTML otherwise Text'),
- ('text', 'Text otherwise HTML'),
- ('both', 'Both HTML & Text')
- ], 'Mail Format', required=True),
- 'incoming_server': fields.char('Incoming mail server', size = 100,
- readonly = True,states = {'draft':[('readonly', False)]},
- help = "eg : imap.gmail.com "),
- 'incoming_port': fields.integer('Port', readonly = True,
- states = {'draft':[('readonly', False)]},
- help="For example IMAP: 993,POP3: 995 "),
- 'incoming_username': fields.char( 'User Name', size = 100,
- readonly = True, states={'draft':[('readonly', False)]}),
- 'incoming_password': fields.char('Password', size = 100,
- readonly = True,
- states = {'draft':[('readonly', False)]}),
- 'incoming_protocol': fields.selection(
- [
- ('imap', 'IMAP'),
- ('pop3', 'POP3')
- ], 'Server Type', readonly = True,
- states={'draft':[('readonly', False)]}),
- 'incoming_use_ssl': fields.boolean( 'Use SSL', readonly=True,
- states={
- 'draft':[('readonly', False)]
- }),
- 'imap_folder': fields.char('Folder', readonly=True, size=100,
- help="Folder to be used for downloading IMAP mails"
- "Click on adjacent button to select from a list of folders"),
- 'last_downloaded_mail': fields.integer('Last Downloaded Mail',
- readonly=True),
- 'use_complete_download': fields.boolean(
- 'First Receive headers, then download mail',
- help="Download complete email(else only headers are downloaded)"),
- 'use_download_attachments': fields.boolean(
- 'Download attachments automatically'),
- 'is_shared_account': fields.boolean('Company Mail A/c', readonly=True,
- help="Select if this mail account does not belong" \
- "to specific user but the organisation as a whole." \
- "eg : info@somedomain.com",
- states={'draft':[('readonly', False)]}),
- 'state': fields.selection([
- ('draft', 'Initiated'),
- ('suspended', 'Suspended'),
- ('approved', 'Approved')],
- 'Account Status', required=True, readonly=True),
- }
- def default_name(self, cursor, user, context = None):
- "Returns the default value for name"
- user_obj = self.pool.get('res.users')
- user = user_obj.browse(cursor, user, user, context)
- return user.name
- _defaults = {
- 'name': default_name,
- 'smtp_use_ssl': lambda * a: True,
- 'state': lambda * a: 'draft',
- 'user': lambda obj, cursor, user, context:user,
- 'incoming_protocol': lambda *a: 'imap',
- 'incoming_use_ssl': lambda *a: True,
- 'last_downloaded_mail': lambda *a: 0,
- 'use_complete_download': lambda *a: True,
- 'use_download_attachments': lambda *a: True,
- 'send_preference': lambda *a: 'html',
- 'smtp_use_tls': lambda *a: True,
- }
- _sql_constraints = [('email_uniq', 'unique (email_id)',
- 'Another setting already exists with this email ID !')]
- def on_change_emailid(self, cursor, user, id, email_id=None,
- smtp_username=None, incoming_username=None, context=None):
- """
- Called when the email ID field changes.
- UI enhancement
- Writes the same email value to the smtpusernameif
- account.incoming_server and account.incoming_port
- and incoming username
- """
- res = {'state': 'draft'}
- if not smtp_username:
- res['smtp_username'] = email_id
- if not incoming_username:
- res['incoming_username'] = email_id
- return {'value': res}
- def _get_outgoing_server(self, cursor, user, accountid, context=None):
- """
- Returns the Out Going Connection (SMTP) object
- .. warning:: DO NOT USE except_osv IN THIS METHOD
- :param cursor: Database Cursor
- :param user: ID of current user
- :param account_id: ID of current object for which connection is required
- :param context: Context
- :return: SMTP server object or Exception
- """
- if type(active_id) == list:
- active_id = active_id[0]
- print active_id
- this_object = self.browse(cursor, user, accountid, context)
- if not this_object:
- raise Exception(_("Account not found"))
- if not (this_object.smtp_server and this_object.smtp_port):
- raise Exception(_("SMTP SERVER or PORT not specified"))
- if this_object.smtp_use_ssl:
- server = smtplib.SMTP_SSL(this_object.smtp_server,
- this_object.smtp_port)
- else:
- server = smtplib.SMTP(this_object.smtp_server,
- this_object.smtp_port)
- if this_object.smtp_use_tls:
- server.ehlo()
- server.starttls()
- server.ehlo()
- if server.has_extn('AUTH') or \
- this_object.smtp_username or \
- this_object.smtp_password:
- server.login(
- this_object.smtp_username.encode('UTF-8'),
- this_object.smtp_password.encode('UTF-8')
- )
- return server
- def check_outgoing_connection(self, cursor, user, accountid, context=None):
- """
- checks SMTP credentials and confirms if outgoing connection works
- (Attached to button)
- :param cursor: Database Cursor
- :param user: ID of current user
- :param ids: list of ids of current object for
- which connection is required
- :param context: Context
- """
- try:
- self._get_outgoing_server(cursor, user, accountid, context)
- raise osv.except_osv(
- _('SMTP Connection'),
- _("SMTP Test Connection Was Successful")
- )
- except osv.except_osv, success_message:
- raise success_message
- except Exception, error:
- raise osv.except_osv(
- _("Out going connection test failed"),
- _("Reason: %s") % error
- )
- def _get_imap_server(self, record):
- """
- :param record: Browse record of current connection
- :return: IMAP or IMAP_SSL object
- """
- if record.incoming_use_ssl:
- server = imaplib.IMAP4_SSL(record.incoming_server,
- record.incoming_port)
- else:
- server = imaplib.IMAP4(record.incoming_server, record.incoming_port)
- #Now try to login
- server.login(record.incoming_username, record.incoming_password)
- return server
- def _get_pop3_server(self, record):
- """
- :param record: Browse record of current connection
- :return: POP3 or POP3_SSL object
- """
- if record.incoming_use_ssl:
- server = poplib.POP3_SSL(record.incoming_server,
- record.incoming_port)
- else:
- server = poplib.POP3(record.incoming_server, record.incoming_port)
- server.user(record.incoming_username)
- server.pass_(record.incoming_password)
- return server
- def _get_incoming_server(self, cursor, user, account_id, context=None):
- """
- Returns the Incoming Server object
- Could be IMAP/IMAP_SSL/POP3/POP3_SSL
- .. warning:: DO NOT USE except_osv IN THIS METHOD
- :param cursor: Database Cursor
- :param user: ID of current user
- :param account_id: ID of current object for which connection is required
- :param context: Context
- :return: IMAP/POP3 server object or Exception
- """
- if type(account_id) == list:
- account_id = account_id[0]
- #this_object = self.browse(cursor, user, account_id, context)
- #print'aaaa'
- #print this_object
- #print account_id
- if not this_object:
- raise Exception(
- _("Account not found")
- )
- if not this_object.incoming_server:
- raise Exception(_("Incoming server is not defined"))
- if not this_object.incoming_port:
- raise Exception(_("Incoming port is not defined"))
- if not this_object.incoming_username:
- raise Exception(_("Incoming server user name is not defined"))
- if not this_object.incoming_username:
- raise Exception(_("Incoming server password is not defined"))
- if this_object.incoming_protocol == 'imap':
- server = self._get_imap_server(this_object)
- elif this_object.incoming_protocol == 'pop3':
- server = self._get_pop3_server(this_object)
- return server
- def check_incoming_connection(self, cursor, user, accountid, context=None):
- """
- checks incoming credentials and confirms if outgoing connection works
- (Attached to button)
- :param cursor: Database Cursor
- :param user: ID of current user
- :param ids: list of ids of current object for
- which connection is required
- :param context: Context
- """
- try:
- self._get_incoming_server(cursor, user, accountid, context)
- raise osv.except_osv(
- _("Incoming Connection"),
- _("Incoming Test Connection Was Successful"))
- except osv.except_osv, success_message:
- raise success_message
- except Exception, error:
- raise osv.except_osv(
- _("Incoming connection test failed"),
- _("Reason: %s") % error)
- def approve(self, cursor, user, accountid, context={}):
- """
- Approves the given mail account
- """
- print accountid
- print context
- return self.write(cursor, user, accountid,
- {'state':'approved'},
- context=context)
- def send_mail(self, cursor, user, id, recepients, message, context=None):
- """
- Sends an email through a core account
- :param id: ID of the core account
- :param recepients: A list of email ids in strings
- :param message: MIMEMessage
- :param context: Open ERP Context
- :return: True if successful, else raises exception
- """
- if context is None:
- context = {}
- assert type(id) in (int, long)
- account = self.browse(cursor, user, id, context)
- server = self._get_outgoing_server(cursor, user, id, context)
- message['From'] = Header(account.name, 'utf-8').encode()
- message['Organisation'] = tools.ustr(account.user.is_shared_account_id.name)
- addresses = self.split_to_ids(','.join(recepients))
- server.sendmail(message['From'], addresses, message.as_string())
- server.close()
- return True
- def save_to_mailbox(self, cursor, user, message, account_id, message_id,
- is_new_mail, read, context=None):
- """
- Save Headers into mail
- :param message: Email Message object
- :param account_id: ID of the account
- :param message_id: ID of the message on server
- :param read: Boolean to indicate if the mail is read
- :param context: Context of Open ERP
- """
- mail_obj = self.pool.get('poweremail.mailbox')
- return mail_obj.from_email(
- cursor, user, is_new_mail, message_id,
- {
- 'state':read and 'read' or 'unread',
- 'server_ref':message_id,
- 'pem_account_id':account_id
- },
- message, context)
- def update_mail(self, cursor, user, message, account_id, message_id, read,
- mailbox_mail_id, context=None):
- """
- Complete an existing email
- :param message: Email Message object
- :param account_id: ID of the account
- :param message_id: ID of the message on server
- :param read: Boolean to indicate if the mail is read
- :param mailbox_mail_id: ID of mail which needs updation
- :param context: Context of Open ERP
- """
- mail_obj = self.pool.get('poweremail.mailbox')
- return mail_obj.from_email(
- cursor, user, mailbox_mail_id,
- {
- 'state':read and 'read' or 'unread',
- 'server_ref':message_id,
- 'pem_account_id':account_id
- },
- message, context)
- def _receive_mails_imap(self, account, message_id, server,
- header_only=False):
- """
- Receives mail from imap
- :param account: Browse record for core account
- :param message_id: ID of message on server
- :param server: IMAP/IMAP_SSL server object with folder selected
- :param header_only: If only header has to be downloaded?
- :return: tuple of (email(message), seen (bool))
- """
- if header_only:
- typ, data = server.fetch(
- str(message_id),
- '(FLAGS BODY.PEEK[HEADER])'
- )
- else:
- typ, data = server.fetch(
- str(message_id),
- '(FLAGS RFC822)'
- )
- # Notes:
- # data comes in a list [('PART1','PART2')]
- # Eg: [('FLAGS', 'RFC822')]
- return message_from_string(data[0][1]), ('/Seen' in data[0][0])
- def _receive_mails_pop(self, account, message_id, server, header_only=False):
- """
- Receives mail from pop
- :param account: Browse record for core account
- :param message_id: ID of message on server
- :param server: POP/POP_SSL server object with folder selected
- :param header_only: If only header has to be downloaded?
- :return: tuple of (email(message), seen (bool))
- """
- if header_only:
- resp, message, octet = server.top(message_id, 20)
- else:
- resp, message, octet = server.retr(message_id)
- return message_from_string(''.join(message, "\n")), False
- def receive_mails(self, cursor, user, ids, context=None):
- """
- Receives emails for the given accounts
- """
- for account in self.browse(cursor, user, ids, context):
- server = self._get_incoming_server(cursor, user, account.id,
- context)
- if account.incoming_protocol == 'imap':
- typ, msg_count = server.select('"%s"' % account.imap_folder)
- if account.last_downloaded_mail < int(msg_count[0]):
- for message_id in xrange(
- account.last_downloaded_mail + 1,
- int(msg_count[0]) + 1):
- message, seen = self._receive_mails_imap(account,
- message_id, server, account.use_download_attachments
- )
- self.save_to_mailbox(cursor, user, message, account.id,
- message_id, True, seen, context)
- self.write(cursor, user, account.id,
- {'last_downloaded_mail':message_id}, context)
- else:
- # POP3
- if account.last_downloaded_mail < server.stat()[0]:
- for message_id in xrange(
- account.last_downloaded_mail + 1, server.stat()[0] + 1):
- message, seen = self._receive_mails_pop(account,
- message_id, server, account.rec_headers_den_mail)
- self.save_to_mailbox(cursor, user, message, account.id,
- message_id, True, seen, context)
- self.write(cursor, user, account.id,
- {'last_downloaded_mail':message_id}, context)
- return True
- def send_receive(self, cursor, user, ids, context=None):
- """
- Typical Send-Receive funtionality
- """
- mailbox_obj = self.pool.get('poweremail.mailbox')
- self.receive_mails(cursor, user, ids, context)
- for id in ids:
- mailbox_obj.send_all_mail(cursor, user, [], context=account_id)
- return True
- def get_mailbody(self, cursor, user, message_id, account, server_ref,
- context=None):
- if account:
- if account.incoming_server and account.incoming_port \
- and account.incoming_username and account.incoming_password:
- if account.incoming_protocol == 'imap' \
- and account.imap_folder:
- server = self._get_incoming_server(cursor, user, account.id,
- context)
- if account.incoming_protocol == 'imap':
- typ, msg_count = server.select('"%s"' % account.imap_folder)
- message, seen = self._receive_mails_imap(account,
- server_ref, server)
- self.save_to_mailbox(cursor, user, message, account.id,
- message_id, False, seen, context)
- else:
- # POP3
- message, seen = self._receive_mails_pop(account,
- message_id, account.use_complete_download)
- self.save_to_mailbox(cursor, user, message, account.id,
- message_id, False, seen, context)
- def get_fullmail(self, cursor, user, mailid, context=None):
- mailbox_obj = self.pool.get('poweremail.mailbox')
- server_ref = mailbox_obj.read(cursor, user, mailid, ['server_ref'],
- context)['server_ref']
- id = mailbox_obj.read(cursor, user, mailid, ['pem_account_id'],
- context)['pem_account_id'][0]
- account = self.browse(cursor, user, id, context)
- self.get_mailbody(cursor, user, mailid, account, server_ref, context)
- Account()
- class PoweremailSelectIMAPFolder(osv.osv_memory):
- _name = "poweremail.select_imap_folder"
- _description = "Shows a list of IMAP folders"
- def make_readable(self, imap_folder):
- """
- We consider imap_folder may be in one of the following formats:
- A string like this: '(\HasChildren) "/" "INBOX"'
- Or a tuple like this: ('(\\HasNoChildren) "/" {18}', 'INBOX/contacts')
- This functions cleans this into a readable name
- """
- if imap_folder:
- if isinstance(imap_folder, tuple):
- return imap_folder[1]
- result = re.search(
- r'(?:\([^\)]*\)\s\")(.)(?:\"\s)(?:\")?([^\"]*)(?:\")?',
- imap_folder)
- seperator = result.groups()[0]
- folder_readable_name = ""
- splitname = result.groups()[1].split(seperator) #Not readable now
- #If a parent and child exists, format it as parent/child/grandchild
- if len(splitname) > 1:
- for i in range(0, len(splitname) - 1):
- folder_readable_name = splitname[i] + '/'
- folder_readable_name = folder_readable_name + splitname[-1]
- else:
- folder_readable_name = result.groups()[1].split(seperator)[0]
- return folder_readable_name
- return False
- def _get_folders(self, cursor, user, context):
- """
- Get the folders
- """
- #print account_id
- #print context
- account_obj = self.pool.get('poweremail.account')
- #print account_obj
- if context is None:
- context = {}
- #print context
- #account_obj = self.pool.get('poweremail.account')
- if 'active_ids' in context.keys():
- print 'anupam'
- server = account_obj._get_incoming_server(cursor, user, ['actiave_ids'], context)['active_ids'][0]
- #print server
- folderlist = [ ]
- try:
- for folders in server.list()[1]:
- folder_readable_name = self.make_readable(folders)
- if isinstance(folders, tuple):
- data = folders[0] + folders[1]
- else:
- data = folders
- if data.find('Noselect') == -1: #If it is a selectable folder
- if folder_readable_name:
- folderlist.append(
- (folder_readable_name,
- folder_readable_name)
- )
- if folder_readable_name == 'INBOX':
- self.inboxvalue = folder_readable_name
- except Exception, error:
- raise osv.except_osv(
- _("IMAP Server Folder Error"),
- _("An error occurred : %s ") % error)
- else:
- folderlist = [('invalid', 'Invalid')]
- return folderlist
- _columns = {
- 'name':fields.many2one('poweremail.account', string='Email Account',
- readonly=True),
- 'folder':fields.selection(_get_folders,
- string="IMAP Folder"),
- # [
- # ('inbox', 'Inbox'),
- # ('draft', 'Draft'),
- # ('sent_item', 'Sent Items')],
- # 'IMAP Folder', required = True)
- }
- _defaults = {
- 'name':lambda self, cursor, user, context: context['active_ids'][0],
- #'folder': lambda self, cursor, user, context:self.inboxvalue
- }
- def select_folder(self, cursor, user, account_id, context=None):
- """
- TODO: Document This
- """
- print context
- print account_id
- if self.browse(cursor, user, account_id, context):
- if not self.read(cursor, user, account_id,['folder'], context)[0]['folder'] == 'invalid':
- self.pool.get(
- 'poweremail.account'
- ).write(cursor, user, context['active_ids'][0],
- {
- 'imap_folder': self.read(cursor, user,account_id, ['folder'], context)[0]['folder']
- })
- return{
- 'type':'ir.actions.act_window_close'
- }
- else:
- raise osv.except_osv(
- _("Folder Error"),
- _("This is an invalid folder"))
- else:
- raise osv.except_osv(
- _("Foder Error"),
- _("Select a folder before you save record"))
- # print context
- # account_obj = self.pool.get('poweremail.account')
- # print account_obj
- # folder_name = self.browse(cursor, user, account_id, context)
- # print folder_name
- # if not folder_name:
- # raise osv.except_osv(
- # _("Folder Error"),
- # _("Select a folder before you save record "))
- # if folder_name == 'invalid':
- # raise osv.except_osv(
- # _("Folder Error"),
- # _("This is an invalid folder "))
- # account_obj.write(
- # cursor, user, context, account_id,
- # {
- # 'imap_folder':folder_name
- # })
- # return {
- # 'type':'ir.actions.act_window_close'
- # }
- PoweremailSelectIMAPFolder()
Add Comment
Please, Sign In to add comment