Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import re
- import sys
- import ssl
- import email
- import base64
- import socket
- __all__ = ['POP3Client', 'POP3ClientError']
- CRLF = b'\r\n'
- Commands = {'USER': ('AUTH',),
- 'PASS': ('AUTH',),
- 'QUIT': ('AUTH', 'TRANSACTION'),
- 'STAT': ('TRANSACTION'),
- 'TOP': ('TRANSACTION'),
- 'RETR': ('TRANSACTION'),
- 'LIST': ('TRANSACTION')}
- class IMAP:
- def __init__(self, host: str = 'pop.yandex.ru', port: int = 995):
- self._host = host
- self._port = port
- self._sock = self._create_socket()
- self._state = 'NOTAUTH'
- self._create_connection()
- @staticmethod
- def _create_socket():
- try:
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- ssl_sock = ssl.wrap_socket(sock)
- ssl_sock.settimeout(1)
- return ssl_sock
- except OSError as exception:
- print(f'ERROR! Could not create a socket. {exception}.')
- sys.exit()
- def _create_connection(self):
- try:
- self._sock.connect((self._host, self._port))
- try:
- self._receive()
- self._state = 'AUTH'
- print(f'Successful connection to: {self._host}')
- except POP3ClientError:
- print(f'Error connecting to: {self._host}')
- sys.exit()
- except socket.timeout as exception:
- print(f'ERROR! Could not create a connection: {exception}.')
- sys.exit()
- def _close_connection(self):
- self._sock.close()
- def _send_data(self, data: str):
- self._sock.sendall(data.encode() + CRLF)
- def _read_data(self) -> str:
- result = b''
- while True:
- try:
- data_part = self._sock.recv(1024)
- if data_part is None or data_part == b'':
- break
- result += data_part
- except socket.timeout:
- break
- return result.decode()
- def _receive(self):
- response = self._read_data()
- if not response.startswith('+'):
- raise POP3ClientError('ERROR!' + response[4:])
- return response[4:]
- def _is_valid_command(self, command: str) -> bool:
- states = Commands.get(command)
- if states is None or self._state not in states:
- return False
- return True
- def _execute(self, command: str) -> str:
- if self._is_valid_command(command[0:4].strip()):
- self._send_data(command)
- try:
- response = self._receive()
- return response
- except POP3ClientError as error:
- return f'Command Error: {error}'
- else:
- return 'Command Error: Wrong command or command can not be execute from this state'
- def user(self, user: str) -> str:
- return self._execute(f'USER {user}')
- def password(self, password: str) -> str:
- response = self._execute(f'PASS {password}')
- if response.startswith('Command Error'):
- return response
- self._state = 'TRANSACTION'
- return 'Successful authorization'
- def stat(self) -> str:
- return 'Messages: ' + self._execute('STAT')
- def head(self, msg: int, type:str) -> str:
- return type + ': ' + self.get_headers(self.top(msg,0))[type]
- def full(self, msg: int):
- message = self.retr(msg)
- if re.findall('Content-Type: multipart/mixed', message.split('\r\n\r\n')[0]):
- boundary = re.findall('boundary="(.*?)"', message)[0]
- content_chunks = re.split('--' + boundary, message)[1:-1]
- result = ''
- for chunk in content_chunks:
- header, content = chunk.split(CRLF.decode() * 2)
- if header.startswith(f'{CRLF.decode()}Content-Disposition: attachment;'):
- filename = re.findall('filename="(.*)"', header)[0]
- with open(filename, 'wb') as file:
- file.write(base64.b64decode(content.encode()))
- result += f'Attachment\r\nHeader:{header}\r\nFilename:{filename}\r\n'
- elif 'text/html' in header:
- result += f"Header:{header}\r\nContent:{re.findall('>(.*?)<',content)[0]}\r\n\r\n"
- else:
- result += f"Header:{header}\r\nContent:{content}\r\n\r\n"
- return result
- elif re.findall('Content-Type: text/html', message.split('\r\n\r\n')[0]):
- return re.findall('>(.*?)<', message.split('\r\n\r\n')[1])[0]
- else:
- return message.split(('\r\n\r\n')[1])
- def list(self, msg: int = None) -> str:
- return self._execute(f'LIST {msg}') if msg else self._execute('LIST')
- def retr(self, msg: int) -> str:
- return self._execute(f'RETR {msg}')
- def top(self, msg: int, num: int) -> str:
- return self._execute(f'TOP {msg} {num}')
- def quit(self) -> str:
- response = self._execute('QUIT')
- self._close_connection()
- return response
- def get_headers(self, message) -> dict:
- from_reg = re.findall('(From: )=\?utf-8\?B\?(.*?)\?= (.*?)', message)
- result = {}
- if from_reg:
- result['From'] = f'{self.get_base64_header(from_reg)} {from_reg[0][2]}'
- else:
- result['From'] = re.findall(f'From: (.*)\n', message)[0]
- result['To'] = re.findall(r'To: (.*?)\n', message)[0]
- result['Date'] = re.findall(r'Date: (.*?)\n', message)[0]
- subject_reg = re.findall('(Subject: |\t)=\?utf-8\?B\?(.*?)\?=', message)
- if subject_reg:
- result['Subject'] = f'{self.get_base64_header(subject_reg)}'
- else:
- result['Subject'] = re.findall(f'Subject: (.*)\n', message)[0]
- return result
- def get_base64_header(self, found) -> str:
- return ''.join([base64.b64decode(x[1].encode()).decode() for x in found])
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement