Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- """
- mail_url_worker.py
- Запускается постоянно (или как systemd service). Каждые 5 секунд проверяет IMAP-ящик
- на письма с темой "url", парсит URL из текста, скачивает указанной утилитой,
- упаковывает в ZIP и отправляет обратно отправителю.
- """
- import imaplib
- import email
- from email.header import decode_header, make_header
- from email.message import EmailMessage
- import smtplib
- import subprocess
- import tempfile
- import zipfile
- import os
- import time
- import re
- import logging
- from typing import List
- from pathlib import Path
- # --- Константы для настройки ---
- IMAP_HOST = "imap.example.com"
- IMAP_PORT = 993
- IMAP_USER = "[email protected]"
- IMAP_PASS = "your_imap_password"
- SMTP_HOST = "smtp.example.com"
- SMTP_PORT = 587
- SMTP_USER = "[email protected]"
- SMTP_PASS = "your_smtp_password"
- MAILBOX = "INBOX"
- CHECK_INTERVAL = 5 # секунд
- # Список доступных внешних утилит и их команд (шаблоны).
- # Шаблон должен содержать {url} и {outdir}
- DOWNLOADERS = {
- # "key": ["command", "arg1", ...] - будет выполнено через subprocess, cwd=outdir
- "single-page-with-images": ["wget", "--page-requisites", "--convert-links", "--no-parent", "--adjust-extension", "-E", "{url}"],
- "recursive-depth-1": ["wget", "-r", "-l", "1", "--no-parent", "--adjust-extension", "-E", "{url}"],
- # пример: можно добавить "httrack": ["httrack", "{url}", "-O", "{outdir}"]
- }
- # По умолчанию использовать один из ключей DOWNLOADERS, если не указан в тексте письма
- DEFAULT_DOWNLOADER = "single-page-with-images"
- # Регулярка для парсинга URL из текста (простая, достаточно для обычных случаев)
- URL_RE = re.compile(r"https?://[^\s<>\"']+")
- # Логирование
- logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
- # --- Помощные функции ---
- def imap_connect():
- logging.info("Connecting to IMAP %s:%s", IMAP_HOST, IMAP_PORT)
- M = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT)
- M.login(IMAP_USER, IMAP_PASS)
- return M
- def smtp_send_with_attachment(to_addr: str, subject: str, body: str, attachment_path: str):
- msg = EmailMessage()
- msg["From"] = SMTP_USER
- msg["To"] = to_addr
- msg["Subject"] = subject
- msg.set_content(body)
- with open(attachment_path, "rb") as f:
- data = f.read()
- maintype = "application"
- subtype = "zip"
- msg.add_attachment(data, maintype=maintype, subtype=subtype, filename=os.path.basename(attachment_path))
- logging.info("Connecting to SMTP %s:%s", SMTP_HOST, SMTP_PORT)
- with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as s:
- s.starttls()
- s.login(SMTP_USER, SMTP_PASS)
- s.send_message(msg)
- logging.info("Sent email with attachment to %s", to_addr)
- def parse_subject(msg) -> str:
- raw = msg.get("Subject", "")
- try:
- return str(make_header(decode_header(raw)))
- except Exception:
- return raw
- def extract_text_from_email(msg) -> str:
- # Возвращает объединённый текст всех text/plain и text/html (html очищается)
- parts = []
- if msg.is_multipart():
- for part in msg.walk():
- ctype = part.get_content_type()
- if ctype == "text/plain":
- try:
- parts.append(part.get_payload(decode=True).decode(part.get_content_charset() or "utf-8", errors="ignore"))
- except Exception:
- pass
- elif ctype == "text/html" and not parts:
- # если нет plain — извлечь из html (простая очистка)
- try:
- html = part.get_payload(decode=True).decode(part.get_content_charset() or "utf-8", errors="ignore")
- text = re.sub(r"<[^>]+>", " ", html)
- parts.append(text)
- except Exception:
- pass
- else:
- ctype = msg.get_content_type()
- payload = msg.get_payload(decode=True)
- if payload:
- try:
- text = payload.decode(msg.get_content_charset() or "utf-8", errors="ignore")
- except Exception:
- text = str(payload)
- if ctype == "text/html":
- text = re.sub(r"<[^>]+>", " ", text)
- parts.append(text)
- return "\n".join(parts)
- def find_urls(text: str) -> List[str]:
- return list(dict.fromkeys(URL_RE.findall(text))) # сохранить порядок и удалить дубликаты
- def choose_downloader_from_text(text: str) -> str:
- # Ищем строку вида: downloader: ключ
- m = re.search(r"downloader\s*[:=]\s*([A-Za-z0-9_\-]+)", text, flags=re.I)
- if m:
- key = m.group(1)
- if key in DOWNLOADERS:
- return key
- else:
- logging.warning("Requested downloader %s not in DOWNLOADERS, using default", key)
- return DEFAULT_DOWNLOADER
- def run_downloader_for_url(downloader_key: str, url: str, outdir: str) -> int:
- template = DOWNLOADERS[downloader_key]
- cmd = [part.format(url=url, outdir=outdir) for part in template]
- logging.info("Running downloader: %s", " ".join(cmd))
- # Запустить процесс. stdout/stderr будут в лог, вернуть код возврата.
- try:
- res = subprocess.run(cmd, cwd=outdir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=300)
- logging.debug("stdout: %s", res.stdout.decode(errors="ignore"))
- logging.debug("stderr: %s", res.stderr.decode(errors="ignore"))
- return res.returncode
- except Exception as e:
- logging.exception("Downloader failed: %s", e)
- return -1
- def zip_directory(src_dir: str, zip_path: str):
- with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
- for root, dirs, files in os.walk(src_dir):
- for f in files:
- full = os.path.join(root, f)
- arcname = os.path.relpath(full, src_dir)
- zf.write(full, arcname)
- logging.info("Created zip %s", zip_path)
- # --- Основной цикл обработки писем ---
- def process_email_item(M, num):
- # Получить письмо
- res, data = M.fetch(num, "(RFC822)")
- if res != "OK":
- logging.error("Failed fetch %s: %s", num, res)
- return
- raw = data[0][1]
- msg = email.message_from_bytes(raw)
- subject = parse_subject(msg)
- frm = email.utils.parseaddr(msg.get("From"))[1] or IMAP_USER
- logging.info("Processing mail %s from %s subject=%s", num, frm, subject)
- body_text = extract_text_from_email(msg)
- downloader_key = choose_downloader_from_text(body_text)
- urls = find_urls(body_text)
- if not urls:
- logging.info("No URLs found in message %s — marking seen", num)
- M.store(num, "+FLAGS", "\\Seen")
- return
- # Создать временную директорию для скачивания
- with tempfile.TemporaryDirectory(prefix="maildl_") as tmpdir:
- # Для каждого URL — создать subdir и запустить скачивание
- for i, url in enumerate(urls, start=1):
- sub = os.path.join(tmpdir, f"{i}")
- os.makedirs(sub, exist_ok=True)
- rc = run_downloader_for_url(downloader_key, url, sub)
- logging.info("Downloader returned %s for %s", rc, url)
- # Запаковать содержимое tmpdir в zip
- zip_path = os.path.join(tempfile.gettempdir(), f"download_{int(time.time())}.zip")
- zip_directory(tmpdir, zip_path)
- # Отправить ZIP обратно отправителю
- subject_reply = f"Re: {subject} (downloaded {len(urls)} URLs)"
- body_reply = f"Downloaded {len(urls)} URLs using downloader '{downloader_key}'.\nOriginal subject: {subject}\n"
- try:
- smtp_send_with_attachment(frm, subject_reply, body_reply, zip_path)
- except Exception:
- logging.exception("Failed to send reply")
- # удалить zip
- try:
- os.remove(zip_path)
- except Exception:
- pass
- # пометить письмо прочитанным и переместить в папку (опционально)
- M.store(num, "+FLAGS", "\\Seen")
- # при желании: M.copy(num, "Processed"); M.store(num, "+FLAGS", "\\Deleted")
- logging.info("Finished processing mail %s", num)
- def main_loop():
- while True:
- try:
- M = imap_connect()
- M.select(MAILBOX)
- # Поиск непрочитанных писем с темой "url" (регистронезависимо)
- typ, data = M.search(None, '(UNSEEN SUBJECT "url")')
- if typ != "OK":
- logging.error("Search failed: %s", typ)
- else:
- nums = data[0].split()
- logging.info("Found %d new 'url' messages", len(nums))
- for num in nums:
- try:
- process_email_item(M, num)
- except Exception:
- logging.exception("Error processing mail %s", num)
- M.logout()
- except Exception:
- logging.exception("Main loop exception")
- time.sleep(CHECK_INTERVAL)
- if __name__ == "__main__":
- main_loop()
Advertisement
Add Comment
Please, Sign In to add comment