gouyez

gmail_api.py

Nov 23rd, 2025
1,418
1
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 19.18 KB | None | 1 0
  1. """
  2. gmail_api.py — Gmail / People API helpers + OAuth loopback (proxy-capable)
  3.  
  4. - All network calls (OAuth, refresh, Gmail, People) use the same per-account requests.Session
  5.  which can be configured with a proxy (plain host:port or with scheme).
  6. - Exposes facade objects so `service.users().messages().list()/get()/modify()` style
  7.  works with minimal changes to existing plugins.
  8. """
  9.  
  10. import os
  11. import time
  12. import http.server
  13. import threading
  14. import urllib.parse
  15. import socket
  16. import json
  17. from typing import Optional
  18.  
  19. import requests
  20. from core.config import resource_path
  21. from google.oauth2.credentials import Credentials
  22. from google_auth_oauthlib.flow import InstalledAppFlow
  23. from google.auth.transport.requests import Request as GoogleAuthRequest
  24. from google.auth.transport.requests import AuthorizedSession
  25.  
  26. CREDENTIALS_FILE = "credentials.json"
  27. SCOPES = [
  28.     "https://www.googleapis.com/auth/gmail.modify",
  29.     "https://www.googleapis.com/auth/contacts",
  30. ]
  31. CREDENTIALS_PATH = resource_path(CREDENTIALS_FILE)
  32.  
  33. GMAIL_API_BASE = "https://gmail.googleapis.com/gmail/v1"
  34. PEOPLE_API_BASE = "https://people.googleapis.com/v1"
  35.  
  36.  
  37. # ---------- Token storage ----------
  38. def token_path_for(email: str) -> str:
  39.     safe = "".join(c for c in email if c.isalnum() or c in ("@", ".", "_", "-")).replace("@", "_at_")
  40.     return os.path.join("emails", f"{safe}.json")
  41.  
  42.  
  43. # ---------- OAuth callback ----------
  44. class _OAuthHandler(http.server.BaseHTTPRequestHandler):
  45.     def do_GET(self):
  46.         parsed = urllib.parse.urlparse(self.path)
  47.         qs = urllib.parse.parse_qs(parsed.query)
  48.         if parsed.path not in ("/", "/callback"):
  49.             self.send_error(404)
  50.             return
  51.  
  52.         code = qs.get("code", [None])[0]
  53.         error = qs.get("error", [None])[0]
  54.  
  55.         shared = getattr(self.server, "shared", None)
  56.         if shared is None:
  57.             self.send_error(500, "Missing shared context")
  58.             return
  59.  
  60.         if error:
  61.             shared["error"] = error
  62.             html = "<h2>❌ Authorization denied.</h2>"
  63.         elif code:
  64.             shared["code"] = code
  65.             html = (
  66.                 "<html><body style='font-family:sans-serif;text-align:center;padding:40px;'>"
  67.                 "<h2>✅ Authorization complete!</h2>"
  68.                 "<p>You can close this tab. Redirecting to Gmail…</p>"
  69.                 "<script>setTimeout(()=>{location.replace('https://mail.google.com/mail/u/0/#inbox');},2000);</script>"
  70.                 "</body></html>"
  71.             )
  72.         else:
  73.             html = "<h2>⚠️ No authorization code received.</h2>"
  74.  
  75.         self.send_response(200)
  76.         self.send_header("Content-Type", "text/html; charset=utf-8")
  77.         self.end_headers()
  78.         self.wfile.write(html.encode("utf-8"))
  79.         # shutdown server
  80.         threading.Thread(target=self.server.shutdown, daemon=True).start()
  81.  
  82.     def log_message(self, *args):
  83.         return
  84.  
  85.  
  86. class _ThreadedServer(http.server.ThreadingHTTPServer):
  87.     daemon_threads = True
  88.  
  89.  
  90. def _find_free_port():
  91.     s = socket.socket()
  92.     s.bind(("127.0.0.1", 0))
  93.     port = s.getsockname()[1]
  94.     s.close()
  95.     return port
  96.  
  97.  
  98. # ---------- proxy session builder ----------
  99. def _build_proxied_session(proxy: Optional[str] = None) -> requests.Session:
  100.     """
  101.    Build a requests.Session with proxies applied.
  102.    `proxy` may be 'host:port' or 'http://host:port'. We'll set both http and https.
  103.    """
  104.     s = requests.Session()
  105.     if proxy:
  106.         p = str(proxy).strip()
  107.         # ensure scheme
  108.         if "://" not in p:
  109.             proxy_url = f"http://{p}"
  110.         else:
  111.             proxy_url = p
  112.         s.proxies.update({"http": proxy_url, "https": proxy_url})
  113.     # reasonable defaults
  114.     s.headers.update({"User-Agent": "GmailHybridManager/1.0"})
  115.     s.verify = True
  116.     return s
  117.  
  118.  
  119. # ---------- OAuth flow using proxied session ----------
  120. def _oauth_once_with_session(email, sess, log_fn, prefill_email: str = None, proxy: str = None):
  121.     """Run OAuth flow inside the given Chrome session and return credentials.
  122.    - prefill_email: if provided, we try to inject the email into the Google sign-in page input.
  123.    - proxy: accepted for compatibility but not used here (kept for API parity).
  124.    """
  125.     if not os.path.exists(CREDENTIALS_PATH):
  126.         raise FileNotFoundError("Missing credentials.json next to the app.")
  127.  
  128.     port = _find_free_port()
  129.     redirect_uri = f"http://127.0.0.1:{port}/callback"
  130.  
  131.     flow = InstalledAppFlow.from_client_secrets_file(
  132.         str(CREDENTIALS_PATH), SCOPES, redirect_uri=redirect_uri
  133.     )
  134.     auth_url, _ = flow.authorization_url(
  135.         access_type="offline", prompt="consent", include_granted_scopes="true"
  136.     )
  137.  
  138.     # Per-thread shared dict for this OAuth session
  139.     local_shared = {}
  140.  
  141.     # Create threaded HTTP server bound to this dict
  142.     server = _ThreadedServer(("127.0.0.1", port), _OAuthHandler)
  143.     server.shared = local_shared
  144.  
  145.     # Start background server thread
  146.     t = threading.Thread(target=server.serve_forever, daemon=True)
  147.     t.start()
  148.  
  149.     # Wait until port is open before launching Chrome
  150.     deadline = time.time() + 3
  151.     ready = False
  152.     while time.time() < deadline:
  153.         try:
  154.             with socket.create_connection(("127.0.0.1", port), timeout=0.5):
  155.                 ready = True
  156.                 break
  157.         except OSError:
  158.             time.sleep(0.1)
  159.     if not ready:
  160.         raise RuntimeError(f"OAuth callback server failed to start on port {port}")
  161.  
  162.     # Navigate Chrome to authorization URL
  163.     from core.chrome import cdp_navigate
  164.     log_fn(f"[OAUTH] Navigating to consent URL for {email} ...")
  165.     cdp_navigate(sess.ws_url, auth_url, wait_load=False, log_fn=log_fn)
  166.  
  167.     # If we have a prefill_email, attempt to inject it client-side into the sign-in email field.
  168.     # This uses CDP Runtime.evaluate to set the value and dispatch input events.
  169.     if prefill_email and sess and sess.ws_url:
  170.         try:
  171.             # small JS snippet to populate the email input and dispatch events
  172.             js = f"""
  173.            (function() {{
  174.                try {{
  175.                    // Google sign-in uses input[type="email"] (but the DOM can change); try a few selectors.
  176.                    var sel = document.querySelector('input[type="email"], input[id="identifierId"], input[name="identifier"]');
  177.                    if (!sel) return {{ok:false, reason:"no-input"}};
  178.                    sel.focus();
  179.                    sel.value = "{prefill_email.replace('"','\\"')}";
  180.                    var ev = new Event('input', {{bubbles:true}});
  181.                    sel.dispatchEvent(ev);
  182.                    var changeEv = new Event('change', {{bubbles:true}});
  183.                    sel.dispatchEvent(changeEv);
  184.                    return {{ok:true}};
  185.                }} catch(e) {{ return {{ok:false,reason:String(e)}}; }}
  186.            }})();"""
  187.             from core.chrome import cdp_navigate as _noop_nav  # keep import local
  188.             # We'll use the websocket CDP helper directly to evaluate JS.
  189.             from core.chrome import cdp_navigate, ChromeSession  # for typings only
  190.             # Use websocket directly (reuse the cdp_navigate websocket connection pattern)
  191.             # Build a small helper here to send Runtime.evaluate:
  192.             import websocket as _ws
  193.             ws_conn = None
  194.             try:
  195.                 ws_conn = _ws.create_connection(sess.ws_url, timeout=6)
  196.                 # enable runtime
  197.                 ws_conn.send(json.dumps({"id": 1, "method": "Runtime.enable"}))
  198.                 # evaluate
  199.                 cmd_id = 2
  200.                 ws_conn.send(json.dumps({"id": cmd_id, "method": "Runtime.evaluate", "params": {"expression": js, "awaitPromise": True, "returnByValue": True}}))
  201.                 # read a few responses (non-blocking-ish)
  202.                 timeout_deadline = time.time() + 4
  203.                 while time.time() < timeout_deadline:
  204.                     try:
  205.                         raw = ws_conn.recv()
  206.                     except Exception:
  207.                         break
  208.                     try:
  209.                         m = json.loads(raw)
  210.                     except Exception:
  211.                         continue
  212.                     if m.get("id") == cmd_id:
  213.                         # success or not, we stop
  214.                         break
  215.             except Exception as e:
  216.                 log_fn(f"[OAUTH][WARN] prefill email injection failed: {e}")
  217.             finally:
  218.                 try:
  219.                     if ws_conn:
  220.                         ws_conn.close()
  221.                 except Exception:
  222.                     pass
  223.         except Exception as e:
  224.             log_fn(f"[OAUTH][WARN] prefill attempt failed: {e}")
  225.  
  226.     log_fn(f"[OAUTH] waiting for callback on port {port} for {email} …")
  227.  
  228.     # Wait for callback
  229.     deadline = time.time() + 600
  230.     while time.time() < deadline:
  231.         if "code" in local_shared:
  232.             flow.fetch_token(code=local_shared["code"])
  233.             creds = flow.credentials
  234.             tok_path = token_path_for(email)
  235.             os.makedirs(os.path.dirname(tok_path), exist_ok=True)
  236.             with open(tok_path, "w", encoding="utf-8") as f:
  237.                 f.write(creds.to_json())
  238.             log_fn(f"[TOKEN] saved for {email}: {tok_path}")
  239.             return creds, True
  240.  
  241.         if "error" in local_shared:
  242.             raise RuntimeError(f"OAuth error for {email}: {local_shared['error']}")
  243.  
  244.         time.sleep(0.25)
  245.  
  246.     raise TimeoutError(f"Timed out waiting for OAuth callback for {email}")
  247.  
  248.  
  249. def oauth_first_login_in_session(email, sess, log_fn, also_open_gmail_ui=False, prefill_email: str = None, proxy: str = None):
  250.     """
  251.    Public wrapper that accepts optional prefill_email and proxy (for API parity).
  252.    """
  253.     try:
  254.         return _oauth_once_with_session(email, sess, log_fn, prefill_email=prefill_email, proxy=proxy)
  255.     except Exception as e:
  256.         log_fn(f"[OAuth] first attempt failed: {e}; retrying …")
  257.         time.sleep(2)
  258.         return _oauth_once_with_session(email, sess, log_fn, prefill_email=prefill_email, proxy=proxy)
  259.  
  260. # ---------- credentials load/refresh (proxy-aware) ----------
  261. def load_credentials_for(email, log_fn, proxy: Optional[str] = None):
  262.     tok = token_path_for(email)
  263.     creds = None
  264.  
  265.     if os.path.exists(tok):
  266.         try:
  267.             creds = Credentials.from_authorized_user_file(tok, SCOPES)
  268.         except Exception as e:
  269.             log_fn(f"[TOKEN] parse failed for {email}: {e}")
  270.             creds = None
  271.  
  272.     if creds and creds.valid:
  273.         log_fn("[TOKEN] valid")
  274.         return creds
  275.  
  276.     if creds and creds.expired and creds.refresh_token:
  277.         log_fn("[TOKEN] expired -> refreshing")
  278.         try:
  279.             proxied_session = _build_proxied_session(proxy)
  280.             auth_req = GoogleAuthRequest(session=proxied_session)
  281.             creds.refresh(auth_req)
  282.             with open(tok, "w", encoding="utf-8") as f:
  283.                 f.write(creds.to_json())
  284.             log_fn("[TOKEN] refreshed")
  285.             return creds
  286.         except Exception as e:
  287.             log_fn(f"[TOKEN] refresh failed: {e}")
  288.             raise
  289.  
  290.     raise RuntimeError("No valid token for this account.")
  291.  
  292.  
  293. # ---------- low-level proxy apply helper for AuthorizedSession ----------
  294. def _apply_proxies_to_authorized_session(authed_session: AuthorizedSession, session: requests.Session):
  295.     """
  296.    Different google-auth versions expose underlying requests session differently.
  297.    Try known attributes safely.
  298.    """
  299.     try:
  300.         if hasattr(authed_session, "session") and isinstance(authed_session.session, requests.Session):
  301.             authed_session.session.proxies.update(session.proxies)
  302.         elif hasattr(authed_session, "_session") and isinstance(authed_session._session, requests.Session):
  303.             authed_session._session.proxies.update(session.proxies)
  304.         else:
  305.             # fallback: try to set attribute directly
  306.             if hasattr(authed_session, "proxies"):
  307.                 authed_session.proxies.update(session.proxies)
  308.     except Exception:
  309.         # not fatal
  310.         pass
  311.  
  312.  
  313. # ---------- Gmail service facade (proxy-capable) ----------
  314. class GmailServiceProxy:
  315.     def __init__(self, creds: Credentials, session: Optional[requests.Session] = None):
  316.         # AuthorizedSession will sign requests with creds
  317.         self._authed = AuthorizedSession(creds)
  318.         if session:
  319.             _apply_proxies_to_authorized_session(self._authed, session)
  320.  
  321.     # direct convenience wrappers (used by previous helper functions)
  322.     def list_messages(self, query, max_results=500, log_fn=None):
  323.         messages = []
  324.         page_token = None
  325.         fetched = 0
  326.         while True:
  327.             params = {"q": query, "maxResults": 100, "includeSpamTrash": "true"}
  328.             if page_token:
  329.                 params["pageToken"] = page_token
  330.             url = f"{GMAIL_API_BASE}/users/me/messages"
  331.             try:
  332.                 r = self._authed.get(url, params=params, timeout=30)
  333.                 r.raise_for_status()
  334.                 data = r.json()
  335.                 if data.get("messages"):
  336.                     messages.extend(data["messages"])
  337.                     fetched += len(data["messages"])
  338.                     if log_fn:
  339.                         log_fn(f"[SEARCH] {len(messages)} found so far…")
  340.                 page_token = data.get("nextPageToken")
  341.                 if not page_token or fetched >= max_results:
  342.                     break
  343.             except Exception as e:
  344.                 if log_fn:
  345.                     log_fn(f"[SEARCH][ERROR] {e}")
  346.                 break
  347.         return messages[:max_results]
  348.  
  349.     def get_message_full(self, msg_id):
  350.         url = f"{GMAIL_API_BASE}/users/me/messages/{msg_id}"
  351.         r = self._authed.get(url, params={"format": "full"}, timeout=30)
  352.         r.raise_for_status()
  353.         return r.json()
  354.  
  355.     def modify_message(self, msg_id, remove_label_ids=None, add_label_ids=None):
  356.         url = f"{GMAIL_API_BASE}/users/me/messages/{msg_id}/modify"
  357.         body = {}
  358.         if remove_label_ids:
  359.             body["removeLabelIds"] = remove_label_ids
  360.         if add_label_ids:
  361.             body["addLabelIds"] = add_label_ids
  362.         r = self._authed.post(url, json=body, timeout=30)
  363.         r.raise_for_status()
  364.         return r.json()
  365.  
  366.     # Provide googleapiclient-like chain: users().messages().list()/get()/modify()
  367.     def users(self):
  368.         return _UsersFacade(self)
  369.  
  370.  
  371. class _UsersFacade:
  372.     def __init__(self, parent: GmailServiceProxy):
  373.         self._parent = parent
  374.  
  375.     def messages(self):
  376.         return _MessagesFacade(self._parent)
  377.  
  378.  
  379. class _MessagesFacade:
  380.     def __init__(self, parent: GmailServiceProxy):
  381.         self._parent = parent
  382.  
  383.     def list(self, userId="me", q=None, maxResults=100, includeSpamTrash=True):
  384.         # return an object with execute() to match older googleapiclient usage
  385.         params = {"q": q or "", "maxResults": maxResults, "includeSpamTrash": str(includeSpamTrash).lower()}
  386.         class ExecObj:
  387.             def __init__(self, parent, params):
  388.                 self._parent = parent
  389.                 self._params = params
  390.             def execute(self):
  391.                 # call parent's list_messages and emulate googleapiclient response
  392.                 msgs = self._parent.list_messages(self._params.get("q", ""), max_results=self._params.get("maxResults", 100))
  393.                 # googleapiclient returns dict with 'messages' maybe empty and possible nextPageToken
  394.                 return {"messages": msgs}
  395.         return ExecObj(self._parent, params)
  396.  
  397.     def get(self, userId="me", id=None, format="full"):
  398.         class ExecObj:
  399.             def __init__(self, parent, msg_id):
  400.                 self._parent = parent
  401.                 self._msg_id = msg_id
  402.             def execute(self):
  403.                 return self._parent.get_message_full(self._msg_id)
  404.         return ExecObj(self._parent, id)
  405.  
  406.     def modify(self, userId="me", id=None, body=None):
  407.         class ExecObj:
  408.             def __init__(self, parent, msg_id, body):
  409.                 self._parent = parent
  410.                 self._msg_id = msg_id
  411.                 self._body = body or {}
  412.             def execute(self):
  413.                 remove = self._body.get("removeLabelIds")
  414.                 add = self._body.get("addLabelIds")
  415.                 return self._parent.modify_message(self._msg_id, remove_label_ids=remove, add_label_ids=add)
  416.         return ExecObj(self._parent, id, body)
  417.  
  418.  
  419. # ---------- People API facade ----------
  420. class PeopleServiceProxy:
  421.     def __init__(self, creds: Credentials, session: Optional[requests.Session] = None):
  422.         self._authed = AuthorizedSession(creds)
  423.         if session:
  424.             _apply_proxies_to_authorized_session(self._authed, session)
  425.  
  426.     def create_contact(self, email_address):
  427.         url = f"{PEOPLE_API_BASE}/people:createContact"
  428.         body = {"emailAddresses": [{"value": email_address}]}
  429.         r = self._authed.post(url, json=body, timeout=30)
  430.         r.raise_for_status()
  431.         return r.json()
  432.  
  433.     # googleapiclient-like facade: people().createContact()
  434.     def people(self):
  435.         return _PeopleFacade(self)
  436.  
  437.  
  438. class _PeopleFacade:
  439.     def __init__(self, parent: PeopleServiceProxy):
  440.         self._parent = parent
  441.  
  442.     def createContact(self, body=None):
  443.         class ExecObj:
  444.             def __init__(self, parent, body):
  445.                 self._parent = parent
  446.                 self._body = body or {}
  447.             def execute(self):
  448.                 emails = []
  449.                 # accept either body param style or direct email string in convenience call
  450.                 if isinstance(self._body, str):
  451.                     emails = [self._body]
  452.                 else:
  453.                     emails = [e.get("value") for e in (self._body.get("emailAddresses") or [])]
  454.                 # if multiple, create each; here we pick first
  455.                 if not emails:
  456.                     raise RuntimeError("No email provided to createContact")
  457.                 return self._parent.create_contact(emails[0])
  458.         return ExecObj(self._parent, body)
  459.  
  460.  
  461. # ---------- builders ----------
  462. def build_gmail_service(creds: Credentials, proxy: Optional[str] = None):
  463.     session = _build_proxied_session(proxy)
  464.     return GmailServiceProxy(creds, session)
  465.  
  466.  
  467. def build_people_service(creds: Credentials, proxy: Optional[str] = None):
  468.     session = _build_proxied_session(proxy)
  469.     return PeopleServiceProxy(creds, session)
  470.  
  471.  
  472. # ---------- backward-compatible helpers used previously ----------
  473. def search_messages(service_proxy: GmailServiceProxy, query, max_results=500, log_fn=None):
  474.     return service_proxy.list_messages(query, max_results=max_results, log_fn=log_fn)
  475.  
  476.  
  477. def get_message_full(service_proxy: GmailServiceProxy, msg_id):
  478.     return service_proxy.get_message_full(msg_id)
  479.  
  480.  
  481. def mark_as_read(service_proxy: GmailServiceProxy, msg_id, log_fn=None):
  482.     try:
  483.         service_proxy.modify_message(msg_id, remove_label_ids=["UNREAD"])
  484.         if log_fn:
  485.             log_fn(f"[GMAIL] marked as read: {msg_id}")
  486.         return True
  487.     except Exception as e:
  488.         if log_fn:
  489.             log_fn(f"[GMAIL][ERROR] mark_as_read({msg_id}): {e}")
  490.         return False
  491.  
Advertisement