Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #Script by Ian R. Kelly
- #12/17/2023
- import mailbox
- import os
- import re
- import logging
- from email import policy
- from email.utils import parsedate
- from email.generator import BytesGenerator
- from datetime import datetime
- from io import BytesIO
- # Configuration
- #Use email1, email2 and target_names to filter the targets. In the case this script was written for, the targets are specific individuals who's corrispondance is sought by the court.
- #mbox_file = 'YourMBOXFile'
- base_output_folder = 'SortedMessages'
- #email1 = '[email protected]'
- #email2 = '[email protected]'
- #target_names = ['Smith', 'Jones', 'White', 'Doe', '[email protected]']
- email_pattern = re.compile(re.escape(email1) + '|' + re.escape(email2), re.IGNORECASE)
- name_patterns = [re.compile(re.escape(name), re.IGNORECASE) for name in target_names]
- logging.basicConfig(level=logging.INFO,
- format='%(message)s',
- handlers=[logging.FileHandler("export.log"), logging.StreamHandler()])
- def sanitize_subject(subject):
- subject = re.sub(r'[^\w\s]', '', subject)
- subject = subject.replace(' ', '_')
- return subject
- def parse_email_date(date_str):
- if date_str is None:
- return None
- date_str = re.sub(r'\s+\(.*\)$', '', date_str)
- try:
- return datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S %z')
- except ValueError:
- logging.error(f"Failed to parse date: {date_str}")
- return None
- def get_message_body(message):
- if message.is_multipart():
- for part in message.walk():
- content_type = part.get_content_type()
- content_disposition = part.get("Content-Disposition")
- if content_disposition is None:
- if content_type == 'text/plain' or content_type == 'text/html':
- return part.get_payload(decode=True).decode('utf-8', errors='ignore')
- else:
- return message.get_payload(decode=True).decode('utf-8', errors='ignore')
- def create_directory_structure(year, month):
- path = os.path.join(base_output_folder, str(year), month.zfill(2))
- if not os.path.exists(path):
- os.makedirs(path)
- return path
- print(f"Searching {mbox_file} for specific emails. This could take a while...")
- mbox = mailbox.mbox(mbox_file)
- total_messages = len(mbox)
- print(f"Loaded {total_messages} messages.")
- for i, message in enumerate(mbox, start=1):
- print(f"Processing message {i} of {total_messages}...", end="\r")
- try:
- email_date = parse_email_date(message['Date'])
- subject = sanitize_subject(message['Subject'] if message['Subject'] is not None else "No Subject")
- date_str = email_date.strftime('%b-%d-%Y') if email_date else "Date Corrupted or Unreadable."
- filename = f"{date_str}.{subject}.txt"
- if email_pattern.search(message.as_string()) and any(name_pattern.search(message.as_string()) for name_pattern in name_patterns):
- if email_date is None or not (2022 <= email_date.year <= 2022):
- continue
- from_header = message['From']
- to_header = message['To']
- body = get_message_body(message)
- year_month_path = create_directory_structure(email_date.year, email_date.strftime('%m'))
- filepath = os.path.join(year_month_path, filename)
- #Write the email out as a text file.
- with open(filepath, 'w', encoding='utf-8') as text_file:
- text_file.write(f"Date: {date_str}\nFrom: {from_header}\nTo: {to_header}\nSubject: {subject}\n\n{body}")
- #Capture the attachements and export them in their original format (.pdf, .png, .doc, etc)
- if message.is_multipart():
- attachments_folder = os.path.join(year_month_path, f"{filename}.ATTACHMENTS")
- if not os.path.exists(attachments_folder):
- os.makedirs(attachments_folder)
- for part in message.walk():
- if part.get_content_maintype() == 'multipart':
- continue
- content_disposition = part.get('Content-Disposition')
- if content_disposition is None:
- continue
- attachment_filename = part.get_filename()
- if attachment_filename:
- file_path = os.path.join(attachments_folder, attachment_filename)
- with open(file_path, 'wb') as f:
- f.write(part.get_payload(decode=True))
- else:
- print(f'{i} - No relevant addresses found, moving on.')
- except UnicodeEncodeError as e:
- logging.error(f"UnicodeEncodeError processing message {i}: {e}")
- except Exception as e:
- logging.error(f"Unexpected error processing message {i}: {e}")
- print(f"Job's finished.")
Advertisement
Add Comment
Please, Sign In to add comment