Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #Fiddly stuff by SpambotSwatter
- #Found by watching requests from new.reddit.com actions in your browser F12 debug console
- import requests
- import re
- import pickle
- import urllib
- import json
- import time
- import base64
- class Authorizer():
- _username=None
- _password=None
- _two_factor_callback=None
- class Tokens():
- accessToken=None
- modhash=None
- searchQueryId=None
- class Reddit():
- loginpage="https://www.reddit.com/login"
- testlogin="https://www.reddit.com/prefs.json"
- basepage="https://new.reddit.com/"
- gqlpage="https://gql.reddit.com/"
- oauthpage="https://oauth.reddit.com/api/"
- _core=None
- loggedin=False
- _authorizer=Authorizer()
- _token_manager=Tokens()
- timeout=30
- retries=3
- def __init__(self,username,password,two_factor_callback=None,user_agent="Fuuck You, Reddit v0.666",proxy=None):
- self._core=requests.session()
- self._core.headers.update({
- "User-Agent":user_agent,
- "Origin":self.basepage,
- "Referer":self.basepage,
- })
- self._authorizer._username=username
- self._authorizer._password=password
- self._authorizer._two_factor_callback=None
- self._core.cookiejar=f".{username}_cookies.pkl"
- self._core.debug=False
- if proxy:self._core.proxies.update(proxy)
- def Login(self):
- if not self._authorizer._username: #Anonymous login (guest)
- self.loggedin=True
- return True
- self.loggedin=False
- if self._core.cookiejar:
- try:
- with open(self._core.cookiejar,"rb") as c:g=pickle.load(c)
- if self._core.debug:print("Using cookie",end=" ",flush=True)
- if g["c"]:
- self._core.cookies.update(g["c"])
- self._token_manager=g["t"]
- if not self.testlogin:
- if self._core.debug:print("assumed ok")
- self.loggedin=True
- return True
- g=self._core.get(self.testlogin,timeout=self.timeout) #optionally check a page that required being logged in to verify the cookie works, otherwise (re)login
- if g.status_code==200:
- if self._core.debug:print("confirmed ok")
- self.loggedin=True
- return True
- if self._core.debug:print("failed")
- except:pass
- if self._core.debug:print("Logging in",end=" ",flush=True)
- g=self._core.get(self.loginpage,timeout=self.timeout)
- if g.status_code!=200:
- if self._core.debug:print(f"{g.status_code} failed")
- return False
- if "You are already logged in and will be redirected back to Reddit shortly." in g.text:
- if self._core.debug:print("already ok")
- self.SaveCookies()
- return True
- try:tok=re.search("csrf_token\" value=\"([^\"]+)",g.text).group(1)
- except:
- if self._core.debug:print("token not found")
- return False #Problem scraping token from login page
- g=self._core.post(self.loginpage,data={
- "csrf_token":tok,
- "otp":self._authorizer._two_factor_callback() if self._authorizer._two_factor_callback else "",
- "password":self._authorizer._password,
- "dest":self.basepage,
- "username":self._authorizer._username
- },timeout=self.timeout)
- if g.status_code!=200:
- if self._core.debug:print(f"{g.status_code} failed. Password?")
- return False #Login error... password?
- try:self._token_manager.accessToken=self._core.cookies["token_v2"]
- except:pass#self.NewToken()
- if self._core.debug:print("ok")
- self.SaveCookies()
- self.loggedin=True
- return True
- def Logout(self):
- self._core.post("https://www.reddit.com/logout",data={"access_token":self._token_manager.accessToken})
- self.loggedin=False
- self._token_manager=Tokens()
- self._core.cookies=[]
- if self._core.debug:print("Logged out")
- self.SaveCookies()
- OTHERTOKEN_SEARCH=0
- OTHERTOKEN_MODHASH=1
- def NewToken(self,extra=OTHERTOKEN_SEARCH):
- if not self.loggedin:
- if not self.Login():return None
- if self._core.debug:print("Get token",end=" ",flush=True)
- try:
- g=self._core.get(f"{self.basepage}search",timeout=self.timeout)#this page exposession.the token with not so much data
- #Don't set the global session.ion header because some actions will fail if it is set despite being valid
- self._token_manager.accessToken=re.search("accessToken\":\s*\"([^\"]+)",g.text).group(1)
- #Save some other tokens in case you need them, if they returned
- match extra:
- case self.OTHERTOKEN_SEARCH:
- try:self._token_manager.queryId=re.search("searchQueryId\":\s*\"([^\"]+)",g.text).group(1)
- except:pass
- case self.OTHERTOKEN_MODHASH:
- g=self._core.get(f"{self.basepage}user/"+self._authorizer.username+"?limit=1",timeout=self.timeout)
- try:self._token_manager.modhash=re.search("modhash\":\s*\"([^\"]+)",g.text).group(1)
- except:pass
- if self._core.debug:print("ok")
- self.SaveCookies()
- try:#Redundant headers reflect some session cookies, why not?
- self._core.headers.update({
- "x-reddit-loid":self._core.cookies.get_dict()["loid"],
- "x-reddit-session":self._core.cookies.get_dict()["session_tracker"],
- })
- except:pass
- return True
- except:
- if self._core.debug:print("failed")
- return False #failed to scrape a token
- def SaveCookies(self):
- if self._core.cookiejar:
- if self._core.debug:print("Saving cookies")
- try:#Save cookiejar
- with open(self._core.cookiejar,"wb") as c:pickle.dump({"c":self._core.cookies,"t":self._token_manager},c)
- except:pass
- def GQL(self,endpoint,variables=None,raw=False):
- for fix in range(self.retries):
- if not self.loggedin:
- if not self.Login():return None
- if not self._token_manager.accessToken:
- if not self.NewToken():return None
- if self._core.debug:print(f"GQL:{endpoint}")
- g=self._core.post(self.gqlpage,headers={"Authorization":"Bearer "+self._token_manager.accessToken},json={"id":endpoint,"variables":variables},timeout=self.timeout)
- if raw:return g
- match g.status_code:
- case 200:return g.json()
- case 500:continue
- case (401,403):self._token_manager.accessToken=None
- case default:
- if g.json()["code"]=="CSRF_EXPIRED":
- self._token_manager.accessToken=None
- continue
- try:return g.json()
- except:return {"error":g.status_code}
- def LazyGQL(self,request,raw=False):
- return self.GQL(request["id"],request["variables"] if "variables" in request else None,raw)
- def OAuth(self,endpoint,variables,post=None): #GET request if post==None, else POST as URI data - empty dict{}/array[] to post nothing/just variables
- for fix in range(self.retries):
- if not self.loggedin:
- if not self.Login():return None
- if not self._token_manager.accessToken:
- if not self.NewToken():return None
- if self._core.debug:print(f"OAuth:{endpoint}")
- if post is not None:
- g=self._core.post(self.oauthpage+endpoint+"?"+urllib.parse.urlencode(variables),headers={"Authorization":"Bearer "+self._token_manager.accessToken},body=urllib.parse.urlencode(post),timeout=self.timeout)
- else:
- g=self._core.get(self.oauthpage+endpoint+"?"+urllib.parse.urlencode(variables),headers={"Authorization":"Bearer "+self._token_manager.accessToken},timeout=self.timeout)
- match g.status_code:
- case 200:return g.json()
- case 500:continue
- case (401,403):self._token_manager.accessToken=None
- case default:
- try:return g.json()
- except:return {"error":g.status_code}
Add Comment
Please, Sign In to add comment