EkriirkE

PYRAPI.py

Aug 25th, 2023
98
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.14 KB | None | 0 0
  1. #Fiddly stuff by SpambotSwatter
  2. #Found by watching requests from new.reddit.com actions in your browser F12 debug console
  3.  
  4. import requests
  5. import re
  6. import pickle
  7. import urllib
  8. import json
  9. import time
  10. import base64
  11.  
  12. class Authorizer():
  13.     _username=None
  14.     _password=None
  15.     _two_factor_callback=None
  16.  
  17. class Tokens():
  18.     accessToken=None
  19.     modhash=None
  20.     searchQueryId=None
  21.  
  22. class Reddit():
  23.     loginpage="https://www.reddit.com/login"
  24.     testlogin="https://www.reddit.com/prefs.json"
  25.     basepage="https://new.reddit.com/"
  26.     gqlpage="https://gql.reddit.com/"
  27.     oauthpage="https://oauth.reddit.com/api/"
  28.     _core=None
  29.     loggedin=False
  30.     _authorizer=Authorizer()
  31.     _token_manager=Tokens()
  32.     timeout=30
  33.     retries=3
  34.  
  35.     def __init__(self,username,password,two_factor_callback=None,user_agent="Fuuck You, Reddit v0.666",proxy=None):
  36.         self._core=requests.session()
  37.         self._core.headers.update({
  38.             "User-Agent":user_agent,
  39.             "Origin":self.basepage,
  40.             "Referer":self.basepage,
  41.             })
  42.         self._authorizer._username=username
  43.         self._authorizer._password=password
  44.         self._authorizer._two_factor_callback=None
  45.         self._core.cookiejar=f".{username}_cookies.pkl"
  46.         self._core.debug=False
  47.         if proxy:self._core.proxies.update(proxy)
  48.  
  49.     def Login(self):
  50.         if not self._authorizer._username:  #Anonymous login (guest)
  51.             self.loggedin=True
  52.             return True
  53.         self.loggedin=False
  54.         if self._core.cookiejar:
  55.             try:
  56.                 with open(self._core.cookiejar,"rb") as c:g=pickle.load(c)
  57.                 if self._core.debug:print("Using cookie",end=" ",flush=True)
  58.                 if g["c"]:
  59.                     self._core.cookies.update(g["c"])
  60.                     self._token_manager=g["t"]
  61.                    
  62.                     if not self.testlogin:
  63.                         if self._core.debug:print("assumed ok")
  64.                         self.loggedin=True
  65.                         return True
  66.                     g=self._core.get(self.testlogin,timeout=self.timeout)   #optionally check a page that required being logged in to verify the cookie works, otherwise (re)login
  67.                     if g.status_code==200:
  68.                         if self._core.debug:print("confirmed ok")
  69.                         self.loggedin=True
  70.                         return True
  71.                 if self._core.debug:print("failed")
  72.             except:pass
  73.         if self._core.debug:print("Logging in",end=" ",flush=True)
  74.         g=self._core.get(self.loginpage,timeout=self.timeout)
  75.         if g.status_code!=200:
  76.             if self._core.debug:print(f"{g.status_code} failed")
  77.             return False
  78.        
  79.         if "You are already logged in and will be redirected back to Reddit shortly." in g.text:
  80.             if self._core.debug:print("already ok")
  81.             self.SaveCookies()
  82.             return True
  83.        
  84.         try:tok=re.search("csrf_token\" value=\"([^\"]+)",g.text).group(1)
  85.         except:
  86.             if self._core.debug:print("token not found")
  87.             return False    #Problem scraping token from login page
  88.        
  89.         g=self._core.post(self.loginpage,data={
  90.             "csrf_token":tok,
  91.             "otp":self._authorizer._two_factor_callback() if self._authorizer._two_factor_callback else "",
  92.             "password":self._authorizer._password,
  93.             "dest":self.basepage,
  94.             "username":self._authorizer._username
  95.             },timeout=self.timeout)
  96.         if g.status_code!=200:
  97.             if self._core.debug:print(f"{g.status_code} failed.  Password?")
  98.             return False    #Login error... password?
  99.        
  100.         try:self._token_manager.accessToken=self._core.cookies["token_v2"]
  101.         except:pass#self.NewToken()
  102.        
  103.         if self._core.debug:print("ok")
  104.                
  105.         self.SaveCookies()
  106.         self.loggedin=True
  107.         return True
  108.    
  109.     def Logout(self):
  110.         self._core.post("https://www.reddit.com/logout",data={"access_token":self._token_manager.accessToken})
  111.         self.loggedin=False
  112.         self._token_manager=Tokens()
  113.         self._core.cookies=[]
  114.         if self._core.debug:print("Logged out")
  115.         self.SaveCookies()
  116.  
  117.     OTHERTOKEN_SEARCH=0
  118.     OTHERTOKEN_MODHASH=1
  119.     def NewToken(self,extra=OTHERTOKEN_SEARCH):
  120.         if not self.loggedin:
  121.             if not self.Login():return None
  122.         if self._core.debug:print("Get token",end=" ",flush=True)
  123.         try:
  124.             g=self._core.get(f"{self.basepage}search",timeout=self.timeout)#this page exposession.the token with not so much data
  125.             #Don't set the global session.ion header because some actions will fail if it is set despite being valid
  126.             self._token_manager.accessToken=re.search("accessToken\":\s*\"([^\"]+)",g.text).group(1)
  127.             #Save some other tokens in case you need them, if they returned
  128.             match extra:
  129.                 case self.OTHERTOKEN_SEARCH:
  130.                     try:self._token_manager.queryId=re.search("searchQueryId\":\s*\"([^\"]+)",g.text).group(1)
  131.                     except:pass
  132.                 case self.OTHERTOKEN_MODHASH:
  133.                     g=self._core.get(f"{self.basepage}user/"+self._authorizer.username+"?limit=1",timeout=self.timeout)
  134.                     try:self._token_manager.modhash=re.search("modhash\":\s*\"([^\"]+)",g.text).group(1)
  135.                     except:pass
  136.             if self._core.debug:print("ok")
  137.             self.SaveCookies()
  138.             try:#Redundant headers reflect some session cookies, why not?
  139.                 self._core.headers.update({
  140.                     "x-reddit-loid":self._core.cookies.get_dict()["loid"],
  141.                     "x-reddit-session":self._core.cookies.get_dict()["session_tracker"],
  142.                     })
  143.             except:pass
  144.             return True
  145.         except:
  146.             if self._core.debug:print("failed")
  147.             return False    #failed to scrape a token
  148.    
  149.     def SaveCookies(self):
  150.         if self._core.cookiejar:
  151.             if self._core.debug:print("Saving cookies")
  152.             try:#Save cookiejar
  153.                 with open(self._core.cookiejar,"wb") as c:pickle.dump({"c":self._core.cookies,"t":self._token_manager},c)
  154.             except:pass
  155.  
  156.     def GQL(self,endpoint,variables=None,raw=False):
  157.         for fix in range(self.retries):
  158.             if not self.loggedin:
  159.                 if not self.Login():return None
  160.             if not self._token_manager.accessToken:
  161.                 if not self.NewToken():return None
  162.             if self._core.debug:print(f"GQL:{endpoint}")
  163.             g=self._core.post(self.gqlpage,headers={"Authorization":"Bearer "+self._token_manager.accessToken},json={"id":endpoint,"variables":variables},timeout=self.timeout)
  164.             if raw:return g
  165.             match g.status_code:
  166.                 case 200:return g.json()
  167.                 case 500:continue
  168.                 case (401,403):self._token_manager.accessToken=None
  169.                 case default:
  170.                     if g.json()["code"]=="CSRF_EXPIRED":
  171.                         self._token_manager.accessToken=None
  172.                         continue
  173.                     try:return g.json()
  174.                     except:return {"error":g.status_code}
  175.  
  176.     def LazyGQL(self,request,raw=False):
  177.         return self.GQL(request["id"],request["variables"] if "variables" in request else None,raw)
  178.  
  179.     def OAuth(self,endpoint,variables,post=None):   #GET request if post==None, else POST as URI data - empty dict{}/array[] to post nothing/just variables
  180.         for fix in range(self.retries):
  181.             if not self.loggedin:
  182.                 if not self.Login():return None
  183.             if not self._token_manager.accessToken:
  184.                 if not self.NewToken():return None
  185.             if self._core.debug:print(f"OAuth:{endpoint}")
  186.             if post is not None:
  187.                 g=self._core.post(self.oauthpage+endpoint+"?"+urllib.parse.urlencode(variables),headers={"Authorization":"Bearer "+self._token_manager.accessToken},body=urllib.parse.urlencode(post),timeout=self.timeout)
  188.             else:
  189.                 g=self._core.get(self.oauthpage+endpoint+"?"+urllib.parse.urlencode(variables),headers={"Authorization":"Bearer "+self._token_manager.accessToken},timeout=self.timeout)
  190.             match g.status_code:
  191.                 case 200:return g.json()
  192.                 case 500:continue
  193.                 case (401,403):self._token_manager.accessToken=None
  194.                 case default:
  195.                     try:return g.json()
  196.                     except:return {"error":g.status_code}
  197.  
Add Comment
Please, Sign In to add comment