Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import socket
- import base64
- import select
- import ssl
- import json
- import argparse
- import os
- import shutil
- from email.parser import Parser
- from email.policy import default
- CRLF = b'\r\n'
- class Config:
- def __init__(self, user, password, server, port, letters_dir, use_ssl):
- self.user = user
- self.password = password
- self.server = server
- self.port = port
- self.letters_dir = letters_dir
- self.use_ssl = use_ssl
- def load_config(filename):
- with open(filename, 'r') as config_file:
- d = json.load(config_file)
- return Config(d['user'], d['password'], d['server'], d['port'], d['letters_dir'], d['use_ssl'])
- def parse_args():
- parser = argparse.ArgumentParser(description='simple pop3 client')
- parser.add_argument('--config', required=False, default='mail_config.json', help='config file', type=str)
- parser.add_argument('--count', required=False, default=1, help='amount of letters to download', type=int)
- return parser.parse_args()
- class Attachment:
- def __init__(self, name, data):
- self.name = name
- self.data = data
- class LetterInfo:
- def __init__(self, text, subject, from_, to, date, attachments):
- self.text = text
- self.subject = subject
- self.from_ = from_
- self.to = to
- self.date = date
- self.attachments = attachments
- def save_at(self, path):
- if os.path.isdir(path):
- shutil.rmtree(path)
- os.mkdir(path)
- with open(os.path.join(path, 'letter.txt'), 'w') as letter_file:
- letter_file.write(f'From: {self.from_}\n')
- letter_file.write(f'To: {self.to}\n')
- letter_file.write(f'Date: {self.date}\n')
- letter_file.write(f'Subject: {self.subject}\n')
- letter_file.write(f'Attachments: {" ".join(map(lambda x: x.name, self.attachments))}\n')
- letter_file.write(f'Text:\n{self.text}\n')
- os.mkdir(os.path.join(path, 'attachments'))
- for attachment in self.attachments:
- with open(os.path.join(path, 'attachments', attachment.name), 'wb') as attachment_file:
- attachment_file.write(attachment.data)
- class Pop3Response:
- def __init__(self, successful, data):
- self.successful = successful
- self.data = data
- def __str__(self):
- if self.successful:
- return f'[SUCCESSFUL RESPONSE]\n{self.data}'
- return f'[FAILED RESPONSE]\n{self.data}'
- class Pop3Client:
- def __init__(self, host, port, user, password, use_ssl):
- self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- if use_ssl:
- self.__sock = ssl.wrap_socket(self.__sock, ssl_version=ssl.PROTOCOL_SSLv23)
- self.__sock.connect((host, port))
- self.__sock.settimeout(10)
- self._login(user, password)
- def _login(self, user, password):
- print(f'Login for {user}...')
- _ = self._read_single_line()
- user_response = self.execute_raw('USER', [user])
- if not user_response.successful:
- raise ValueError(f'User command failed: {user_response.data}')
- pass_response = self.execute_raw('PASS', [password])
- if not pass_response.successful:
- raise ValueError(f'Password command failed: {pass_response.data}')
- print(f'Successfully login for: {user}')
- def __enter__(self):
- return self
- def __exit__(self, e, et, tb):
- self.__sock.close()
- return False
- def retr(self, i):
- resp = self.execute_raw('RETR', [i], is_long_cmd=True)
- if not resp.successful:
- print(resp.data)
- return
- headers = Parser(policy=default).parsestr(resp.data)
- from_ = headers['From']
- to = headers['To']
- date = headers['Date']
- subject = headers['Subject']
- parts = list(headers.iter_parts())
- text = parts[0].get_payload()
- attachments = []
- for i in range(1, len(parts)):
- attachment = self._read_attachment(parts[i])
- if attachment:
- attachments.append(attachment)
- return LetterInfo(text, subject, from_, to, date, attachments)
- def _read_attachment(self, h):
- name = ''
- if h['Content-Type'].maintype != 'image' or h['Content-Transfer-Encoding'].cte != 'base64':
- print(f'Cant read attachment of type: {h["Content-Type"].content_type} and cte {h["Content-Transfer-Encoding"].cte}')
- return None
- for p in h['Content-Type'].split(';'):
- if p.find('name=') == -1:
- continue
- pp = p.split('=')
- name = pp[1].strip('"')
- break
- data = base64.b64decode(''.join(h.get_payload().split()))
- return Attachment(name, data)
- def execute_raw(self, command, args=None, is_long_cmd=False):
- cmd = command.encode()
- if args:
- cmd += b' ' + b' '.join(map(lambda x: str(x).encode(), args))
- if not cmd.endswith(CRLF):
- cmd += CRLF
- self.__sock.sendall(cmd)
- if not is_long_cmd:
- return self._read_response_with_single_line()
- return self._read_long_response()
- def _read_long_response(self):
- resp = self._read_response_with_single_line()
- lines = []
- if not resp.successful:
- return resp
- line = self._read_single_line()
- while line != b'.':
- if not line.startswith(b'..'):
- lines.append(line)
- else:
- lines.append(b'.')
- line = self._read_single_line()
- return Pop3Response(True, '\n'.join(map(lambda x: x.decode(), lines)))
- def _read_response_with_single_line(self):
- line = self._read_single_line().decode()
- successful = line.startswith('+OK')
- data = (line[4:] if len(line) > 3 else line[3:]) if successful else (line[5:] if len(line) > 4 else line[4:])
- return Pop3Response(successful, data)
- def _read_single_line(self):
- line = bytearray()
- while True:
- line.extend(self.__sock.recv(1))
- if line.endswith(CRLF):
- break
- return line[:-2]
- def main():
- try:
- args = parse_args()
- config = load_config(args.config)
- if not os.path.isdir(config.letters_dir):
- os.mkdir(config.letters_dir)
- with Pop3Client(config.server, config.port, config.user, config.password, config.use_ssl) as client:
- for i in range(args.count):
- letter = client.retr(i + 1)
- letter.save_at(os.path.join(config.letters_dir, str(i + 1)))
- except (FileNotFoundError, KeyError):
- print('Invalid config file')
- except socket.timeout:
- print('Network operation timeout')
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement