Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- gmail_api.py — Gmail / People API helpers + OAuth loopback (proxy-capable)
- - All network calls (OAuth, refresh, Gmail, People) use the same per-account requests.Session
- which can be configured with a proxy (plain host:port or with scheme).
- - Exposes facade objects so `service.users().messages().list()/get()/modify()` style
- works with minimal changes to existing plugins.
- """
- import os
- import time
- import http.server
- import threading
- import urllib.parse
- import socket
- import json
- from typing import Optional
- import requests
- from core.config import resource_path
- from google.oauth2.credentials import Credentials
- from google_auth_oauthlib.flow import InstalledAppFlow
- from google.auth.transport.requests import Request as GoogleAuthRequest
- from google.auth.transport.requests import AuthorizedSession
- CREDENTIALS_FILE = "credentials.json"
- SCOPES = [
- "https://www.googleapis.com/auth/gmail.modify",
- "https://www.googleapis.com/auth/contacts",
- ]
- CREDENTIALS_PATH = resource_path(CREDENTIALS_FILE)
- GMAIL_API_BASE = "https://gmail.googleapis.com/gmail/v1"
- PEOPLE_API_BASE = "https://people.googleapis.com/v1"
- # ---------- Token storage ----------
- def token_path_for(email: str) -> str:
- safe = "".join(c for c in email if c.isalnum() or c in ("@", ".", "_", "-")).replace("@", "_at_")
- return os.path.join("emails", f"{safe}.json")
- # ---------- OAuth callback ----------
- class _OAuthHandler(http.server.BaseHTTPRequestHandler):
- def do_GET(self):
- parsed = urllib.parse.urlparse(self.path)
- qs = urllib.parse.parse_qs(parsed.query)
- if parsed.path not in ("/", "/callback"):
- self.send_error(404)
- return
- code = qs.get("code", [None])[0]
- error = qs.get("error", [None])[0]
- shared = getattr(self.server, "shared", None)
- if shared is None:
- self.send_error(500, "Missing shared context")
- return
- if error:
- shared["error"] = error
- html = "<h2>❌ Authorization denied.</h2>"
- elif code:
- shared["code"] = code
- html = (
- "<html><body style='font-family:sans-serif;text-align:center;padding:40px;'>"
- "<h2>✅ Authorization complete!</h2>"
- "<p>You can close this tab. Redirecting to Gmail…</p>"
- "<script>setTimeout(()=>{location.replace('https://mail.google.com/mail/u/0/#inbox');},2000);</script>"
- "</body></html>"
- )
- else:
- html = "<h2>⚠️ No authorization code received.</h2>"
- self.send_response(200)
- self.send_header("Content-Type", "text/html; charset=utf-8")
- self.end_headers()
- self.wfile.write(html.encode("utf-8"))
- # shutdown server
- threading.Thread(target=self.server.shutdown, daemon=True).start()
- def log_message(self, *args):
- return
- class _ThreadedServer(http.server.ThreadingHTTPServer):
- daemon_threads = True
- def _find_free_port():
- s = socket.socket()
- s.bind(("127.0.0.1", 0))
- port = s.getsockname()[1]
- s.close()
- return port
- # ---------- proxy session builder ----------
- def _build_proxied_session(proxy: Optional[str] = None) -> requests.Session:
- """
- Build a requests.Session with proxies applied.
- `proxy` may be 'host:port' or 'http://host:port'. We'll set both http and https.
- """
- s = requests.Session()
- if proxy:
- p = str(proxy).strip()
- # ensure scheme
- if "://" not in p:
- proxy_url = f"http://{p}"
- else:
- proxy_url = p
- s.proxies.update({"http": proxy_url, "https": proxy_url})
- # reasonable defaults
- s.headers.update({"User-Agent": "GmailHybridManager/1.0"})
- s.verify = True
- return s
- # ---------- OAuth flow using proxied session ----------
- def _oauth_once_with_session(email, sess, log_fn, prefill_email: str = None, proxy: str = None):
- """Run OAuth flow inside the given Chrome session and return credentials.
- - prefill_email: if provided, we try to inject the email into the Google sign-in page input.
- - proxy: accepted for compatibility but not used here (kept for API parity).
- """
- if not os.path.exists(CREDENTIALS_PATH):
- raise FileNotFoundError("Missing credentials.json next to the app.")
- port = _find_free_port()
- redirect_uri = f"http://127.0.0.1:{port}/callback"
- flow = InstalledAppFlow.from_client_secrets_file(
- str(CREDENTIALS_PATH), SCOPES, redirect_uri=redirect_uri
- )
- auth_url, _ = flow.authorization_url(
- access_type="offline", prompt="consent", include_granted_scopes="true"
- )
- # Per-thread shared dict for this OAuth session
- local_shared = {}
- # Create threaded HTTP server bound to this dict
- server = _ThreadedServer(("127.0.0.1", port), _OAuthHandler)
- server.shared = local_shared
- # Start background server thread
- t = threading.Thread(target=server.serve_forever, daemon=True)
- t.start()
- # Wait until port is open before launching Chrome
- deadline = time.time() + 3
- ready = False
- while time.time() < deadline:
- try:
- with socket.create_connection(("127.0.0.1", port), timeout=0.5):
- ready = True
- break
- except OSError:
- time.sleep(0.1)
- if not ready:
- raise RuntimeError(f"OAuth callback server failed to start on port {port}")
- # Navigate Chrome to authorization URL
- from core.chrome import cdp_navigate
- log_fn(f"[OAUTH] Navigating to consent URL for {email} ...")
- cdp_navigate(sess.ws_url, auth_url, wait_load=False, log_fn=log_fn)
- # If we have a prefill_email, attempt to inject it client-side into the sign-in email field.
- # This uses CDP Runtime.evaluate to set the value and dispatch input events.
- if prefill_email and sess and sess.ws_url:
- try:
- # small JS snippet to populate the email input and dispatch events
- js = f"""
- (function() {{
- try {{
- // Google sign-in uses input[type="email"] (but the DOM can change); try a few selectors.
- var sel = document.querySelector('input[type="email"], input[id="identifierId"], input[name="identifier"]');
- if (!sel) return {{ok:false, reason:"no-input"}};
- sel.focus();
- sel.value = "{prefill_email.replace('"','\\"')}";
- var ev = new Event('input', {{bubbles:true}});
- sel.dispatchEvent(ev);
- var changeEv = new Event('change', {{bubbles:true}});
- sel.dispatchEvent(changeEv);
- return {{ok:true}};
- }} catch(e) {{ return {{ok:false,reason:String(e)}}; }}
- }})();"""
- from core.chrome import cdp_navigate as _noop_nav # keep import local
- # We'll use the websocket CDP helper directly to evaluate JS.
- from core.chrome import cdp_navigate, ChromeSession # for typings only
- # Use websocket directly (reuse the cdp_navigate websocket connection pattern)
- # Build a small helper here to send Runtime.evaluate:
- import websocket as _ws
- ws_conn = None
- try:
- ws_conn = _ws.create_connection(sess.ws_url, timeout=6)
- # enable runtime
- ws_conn.send(json.dumps({"id": 1, "method": "Runtime.enable"}))
- # evaluate
- cmd_id = 2
- ws_conn.send(json.dumps({"id": cmd_id, "method": "Runtime.evaluate", "params": {"expression": js, "awaitPromise": True, "returnByValue": True}}))
- # read a few responses (non-blocking-ish)
- timeout_deadline = time.time() + 4
- while time.time() < timeout_deadline:
- try:
- raw = ws_conn.recv()
- except Exception:
- break
- try:
- m = json.loads(raw)
- except Exception:
- continue
- if m.get("id") == cmd_id:
- # success or not, we stop
- break
- except Exception as e:
- log_fn(f"[OAUTH][WARN] prefill email injection failed: {e}")
- finally:
- try:
- if ws_conn:
- ws_conn.close()
- except Exception:
- pass
- except Exception as e:
- log_fn(f"[OAUTH][WARN] prefill attempt failed: {e}")
- log_fn(f"[OAUTH] waiting for callback on port {port} for {email} …")
- # Wait for callback
- deadline = time.time() + 600
- while time.time() < deadline:
- if "code" in local_shared:
- flow.fetch_token(code=local_shared["code"])
- creds = flow.credentials
- tok_path = token_path_for(email)
- os.makedirs(os.path.dirname(tok_path), exist_ok=True)
- with open(tok_path, "w", encoding="utf-8") as f:
- f.write(creds.to_json())
- log_fn(f"[TOKEN] saved for {email}: {tok_path}")
- return creds, True
- if "error" in local_shared:
- raise RuntimeError(f"OAuth error for {email}: {local_shared['error']}")
- time.sleep(0.25)
- raise TimeoutError(f"Timed out waiting for OAuth callback for {email}")
- def oauth_first_login_in_session(email, sess, log_fn, also_open_gmail_ui=False, prefill_email: str = None, proxy: str = None):
- """
- Public wrapper that accepts optional prefill_email and proxy (for API parity).
- """
- try:
- return _oauth_once_with_session(email, sess, log_fn, prefill_email=prefill_email, proxy=proxy)
- except Exception as e:
- log_fn(f"[OAuth] first attempt failed: {e}; retrying …")
- time.sleep(2)
- return _oauth_once_with_session(email, sess, log_fn, prefill_email=prefill_email, proxy=proxy)
- # ---------- credentials load/refresh (proxy-aware) ----------
- def load_credentials_for(email, log_fn, proxy: Optional[str] = None):
- tok = token_path_for(email)
- creds = None
- if os.path.exists(tok):
- try:
- creds = Credentials.from_authorized_user_file(tok, SCOPES)
- except Exception as e:
- log_fn(f"[TOKEN] parse failed for {email}: {e}")
- creds = None
- if creds and creds.valid:
- log_fn("[TOKEN] valid")
- return creds
- if creds and creds.expired and creds.refresh_token:
- log_fn("[TOKEN] expired -> refreshing")
- try:
- proxied_session = _build_proxied_session(proxy)
- auth_req = GoogleAuthRequest(session=proxied_session)
- creds.refresh(auth_req)
- with open(tok, "w", encoding="utf-8") as f:
- f.write(creds.to_json())
- log_fn("[TOKEN] refreshed")
- return creds
- except Exception as e:
- log_fn(f"[TOKEN] refresh failed: {e}")
- raise
- raise RuntimeError("No valid token for this account.")
- # ---------- low-level proxy apply helper for AuthorizedSession ----------
- def _apply_proxies_to_authorized_session(authed_session: AuthorizedSession, session: requests.Session):
- """
- Different google-auth versions expose underlying requests session differently.
- Try known attributes safely.
- """
- try:
- if hasattr(authed_session, "session") and isinstance(authed_session.session, requests.Session):
- authed_session.session.proxies.update(session.proxies)
- elif hasattr(authed_session, "_session") and isinstance(authed_session._session, requests.Session):
- authed_session._session.proxies.update(session.proxies)
- else:
- # fallback: try to set attribute directly
- if hasattr(authed_session, "proxies"):
- authed_session.proxies.update(session.proxies)
- except Exception:
- # not fatal
- pass
- # ---------- Gmail service facade (proxy-capable) ----------
- class GmailServiceProxy:
- def __init__(self, creds: Credentials, session: Optional[requests.Session] = None):
- # AuthorizedSession will sign requests with creds
- self._authed = AuthorizedSession(creds)
- if session:
- _apply_proxies_to_authorized_session(self._authed, session)
- # direct convenience wrappers (used by previous helper functions)
- def list_messages(self, query, max_results=500, log_fn=None):
- messages = []
- page_token = None
- fetched = 0
- while True:
- params = {"q": query, "maxResults": 100, "includeSpamTrash": "true"}
- if page_token:
- params["pageToken"] = page_token
- url = f"{GMAIL_API_BASE}/users/me/messages"
- try:
- r = self._authed.get(url, params=params, timeout=30)
- r.raise_for_status()
- data = r.json()
- if data.get("messages"):
- messages.extend(data["messages"])
- fetched += len(data["messages"])
- if log_fn:
- log_fn(f"[SEARCH] {len(messages)} found so far…")
- page_token = data.get("nextPageToken")
- if not page_token or fetched >= max_results:
- break
- except Exception as e:
- if log_fn:
- log_fn(f"[SEARCH][ERROR] {e}")
- break
- return messages[:max_results]
- def get_message_full(self, msg_id):
- url = f"{GMAIL_API_BASE}/users/me/messages/{msg_id}"
- r = self._authed.get(url, params={"format": "full"}, timeout=30)
- r.raise_for_status()
- return r.json()
- def modify_message(self, msg_id, remove_label_ids=None, add_label_ids=None):
- url = f"{GMAIL_API_BASE}/users/me/messages/{msg_id}/modify"
- body = {}
- if remove_label_ids:
- body["removeLabelIds"] = remove_label_ids
- if add_label_ids:
- body["addLabelIds"] = add_label_ids
- r = self._authed.post(url, json=body, timeout=30)
- r.raise_for_status()
- return r.json()
- # Provide googleapiclient-like chain: users().messages().list()/get()/modify()
- def users(self):
- return _UsersFacade(self)
- class _UsersFacade:
- def __init__(self, parent: GmailServiceProxy):
- self._parent = parent
- def messages(self):
- return _MessagesFacade(self._parent)
- class _MessagesFacade:
- def __init__(self, parent: GmailServiceProxy):
- self._parent = parent
- def list(self, userId="me", q=None, maxResults=100, includeSpamTrash=True):
- # return an object with execute() to match older googleapiclient usage
- params = {"q": q or "", "maxResults": maxResults, "includeSpamTrash": str(includeSpamTrash).lower()}
- class ExecObj:
- def __init__(self, parent, params):
- self._parent = parent
- self._params = params
- def execute(self):
- # call parent's list_messages and emulate googleapiclient response
- msgs = self._parent.list_messages(self._params.get("q", ""), max_results=self._params.get("maxResults", 100))
- # googleapiclient returns dict with 'messages' maybe empty and possible nextPageToken
- return {"messages": msgs}
- return ExecObj(self._parent, params)
- def get(self, userId="me", id=None, format="full"):
- class ExecObj:
- def __init__(self, parent, msg_id):
- self._parent = parent
- self._msg_id = msg_id
- def execute(self):
- return self._parent.get_message_full(self._msg_id)
- return ExecObj(self._parent, id)
- def modify(self, userId="me", id=None, body=None):
- class ExecObj:
- def __init__(self, parent, msg_id, body):
- self._parent = parent
- self._msg_id = msg_id
- self._body = body or {}
- def execute(self):
- remove = self._body.get("removeLabelIds")
- add = self._body.get("addLabelIds")
- return self._parent.modify_message(self._msg_id, remove_label_ids=remove, add_label_ids=add)
- return ExecObj(self._parent, id, body)
- # ---------- People API facade ----------
- class PeopleServiceProxy:
- def __init__(self, creds: Credentials, session: Optional[requests.Session] = None):
- self._authed = AuthorizedSession(creds)
- if session:
- _apply_proxies_to_authorized_session(self._authed, session)
- def create_contact(self, email_address):
- url = f"{PEOPLE_API_BASE}/people:createContact"
- body = {"emailAddresses": [{"value": email_address}]}
- r = self._authed.post(url, json=body, timeout=30)
- r.raise_for_status()
- return r.json()
- # googleapiclient-like facade: people().createContact()
- def people(self):
- return _PeopleFacade(self)
- class _PeopleFacade:
- def __init__(self, parent: PeopleServiceProxy):
- self._parent = parent
- def createContact(self, body=None):
- class ExecObj:
- def __init__(self, parent, body):
- self._parent = parent
- self._body = body or {}
- def execute(self):
- emails = []
- # accept either body param style or direct email string in convenience call
- if isinstance(self._body, str):
- emails = [self._body]
- else:
- emails = [e.get("value") for e in (self._body.get("emailAddresses") or [])]
- # if multiple, create each; here we pick first
- if not emails:
- raise RuntimeError("No email provided to createContact")
- return self._parent.create_contact(emails[0])
- return ExecObj(self._parent, body)
- # ---------- builders ----------
- def build_gmail_service(creds: Credentials, proxy: Optional[str] = None):
- session = _build_proxied_session(proxy)
- return GmailServiceProxy(creds, session)
- def build_people_service(creds: Credentials, proxy: Optional[str] = None):
- session = _build_proxied_session(proxy)
- return PeopleServiceProxy(creds, session)
- # ---------- backward-compatible helpers used previously ----------
- def search_messages(service_proxy: GmailServiceProxy, query, max_results=500, log_fn=None):
- return service_proxy.list_messages(query, max_results=max_results, log_fn=log_fn)
- def get_message_full(service_proxy: GmailServiceProxy, msg_id):
- return service_proxy.get_message_full(msg_id)
- def mark_as_read(service_proxy: GmailServiceProxy, msg_id, log_fn=None):
- try:
- service_proxy.modify_message(msg_id, remove_label_ids=["UNREAD"])
- if log_fn:
- log_fn(f"[GMAIL] marked as read: {msg_id}")
- return True
- except Exception as e:
- if log_fn:
- log_fn(f"[GMAIL][ERROR] mark_as_read({msg_id}): {e}")
- return False
Advertisement