Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- 2 объекта для работы с почтой по IMAP4
- EmailRead - создает подключение к ящику и позволяет управлять письмами
- Message - само письмо
- Протестировано на yandex.ru
- """
- import imaplib
- import email
- from typing import List
- class Message:
- def __init__(self, email_id, body):
- self._body = body
- self._email_id = email_id
- def _get_payload(self, part):
- body = part.get_payload()
- if isinstance(body, list):
- part_list: list = []
- for i in body:
- part_list.append(self._get_payload(i))
- return '\n\n'.join(part_list)
- elif isinstance(body, str):
- return body
- else:
- return body.get_payload()
- def __str__(self):
- return f'FROM: {self.from_message}\nTO: {self.to}\nSUBJECT: {self.subject}\n\nMESSAGE:\n{self.message}'
- def __repr__(self):
- return self.__str__()
- @property
- def email_id(self):
- return self._email_id
- @staticmethod
- def _clean_symbols(text: str) -> str:
- return text.strip().replace('<', '').replace('>', '')
- @property
- def message(self) -> str:
- """Возвращает тело письма"""
- return self._get_payload(self._body)
- @property
- def subject(self) -> str:
- """Возвращает тему письма"""
- return email.header.decode_header(self._body['Subject'])[0][0].decode()
- @property
- def to(self) -> str:
- """Возвращает адресата"""
- text = email.header.decode_header(self._body['TO'])[0][0]
- return self._clean_symbols(text)
- @property
- def from_message(self) -> str:
- """Возвращает отправилтеля"""
- text = email.header.decode_header(self._body['FROM'])[1][0].decode()
- return self._clean_symbols(text)
- class EmailRead:
- IMAP_HOST: str = 'imap.yandex.ru'
- LOGIN: str = ''
- PASSWORD: str = ''
- FROM: str
- MAILBOX: str
- connect: imaplib
- def __init__(self, host: str = None, login: str = None, password: str = None):
- """
- :param host:
- :param login:
- :param password:
- """
- self.IMAP_HOST = host if host is not None else self.IMAP_HOST
- self.LOGIN = login if login is not None else self.LOGIN
- self.PASSWORD = password if password is not None else self.PASSWORD
- self.connect = imaplib.IMAP4_SSL(self.IMAP_HOST)
- self.connect.login(user=self.LOGIN, password=self.PASSWORD)
- def __del__(self):
- pass
- def search(self, from_mess: str, mailbox: str = 'INBOX') -> List[bytes]:
- """
- Возвращает список id писем по заданным параметрам
- :param from_mess: Поле от кого
- :param mailbox: Папка для поиска, по умолчанию INBOX - входящие, ALL - везда
- :return: Список id писем
- """
- self.FROM = from_mess
- self.MAILBOX = mailbox
- self.connect.select(mailbox=self.MAILBOX)
- status, mail_ids = self.connect.search(None, 'FROM', self.FROM)
- assert status == 'OK', f'bad status {status}'
- ids: list = mail_ids[0].split()
- ids.reverse()
- return ids
- def id_delete(self, email_id: bytes or list) -> None:
- """
- Удаляет письма из ящика
- :param email_id: список элементов или 1 элемент id писем возвращаемых self.search()
- :return: None
- """
- ids: list = email_id if isinstance(email_id, list) else [email_id]
- for i in ids:
- self.connect.store(i, '+FLAGS', '\\Deleted')
- self.connect.expunge()
- def message_delete(self, message: Message) -> None:
- """
- Удаляет письмо из почтового ящака
- :param message: Объект Message
- :return: None
- """
- self.id_delete(message.email_id)
- def read(self, email_id: bytes or list) -> List[Message]:
- list_ids: list = email_id if isinstance(email_id, list) else [email_id]
- messages: list = []
- for ids in list_ids:
- status, data = self.connect.fetch(ids, '(RFC822)')
- assert status == 'OK', f'bad status {status}'
- text = email.message_from_bytes(data[0][1])
- messages.append(Message(ids, text))
- return messages
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement