Advertisement
Guest User

Untitled

a guest
May 23rd, 2019
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.83 KB | None | 0 0
  1. import random
  2. from datetime import datetime
  3.  
  4. from jwt.exceptions import DecodeError
  5. from sanic import Sanic
  6. from sanic.response import json
  7. from sanic_jwt import (
  8. Authentication,
  9. Claim,
  10. Configuration,
  11. Initialize,
  12. Responses,
  13. exceptions,
  14. )
  15.  
  16.  
  17. class User:
  18. def __init__(self, _id, username):
  19. self.user_id = _id
  20. self.username = username
  21. self._permakey = None
  22. self._last_login = None
  23.  
  24. def __repr__(self):
  25. return "User(id='{}')".format(self.user_id)
  26.  
  27. def to_dict(self):
  28. return {
  29. "user_id": self.user_id,
  30. "username": self.username,
  31. "last_login": self.last_login,
  32. }
  33.  
  34. @property
  35. def permakey(self):
  36. return self._permakey
  37.  
  38. @permakey.setter
  39. def permakey(self, value):
  40. self._permakey = value
  41.  
  42. @property
  43. def last_login(self):
  44. return self._last_login
  45.  
  46. @last_login.setter
  47. def last_login(self, value):
  48. self._last_login = value
  49.  
  50.  
  51. USERS = []
  52. username_table = {u.username: u for u in USERS}
  53. userid_table = {u.user_id: u for u in USERS}
  54.  
  55.  
  56. class MyConfig(Configuration):
  57. def get_verify_exp(self, request):
  58. """
  59. If the request is with the "permakey", then we do not want to check for expiration
  60. """
  61. return not "permakey" in request.headers
  62.  
  63.  
  64. class MyAuthentication(Authentication):
  65. def _verify(
  66. self,
  67. request,
  68. return_payload=False,
  69. verify=True,
  70. raise_missing=False,
  71. request_args=None,
  72. request_kwargs=None,
  73. *args,
  74. **kwargs
  75. ):
  76. """
  77. If there is a "permakey", then we will verify the token by checking the
  78. database. Otherwise, just do the normal verification.
  79.  
  80. Typically, any method that begins with an underscore in sanic-jwt should
  81. not be touched. In this case, we are trying to break the rules a bit to handle
  82. a unique use case: handle both expirable and non-expirable tokens.
  83. """
  84. if "permakey" in request.headers:
  85. # Extract the permakey from the headers
  86. permakey = request.headers.get("permakey")
  87.  
  88. try:
  89. payload = self._decode(permakey, verify=verify)
  90. except DecodeError:
  91. if return_payload:
  92. return {}
  93. return False, 400, "Invalid token format"
  94.  
  95. # Sometimes, the application will call _verify(...return_payload=True)
  96. # So, let's make sure to handle this scenario.
  97. if return_payload:
  98. return payload
  99.  
  100. request_kwargs = request_kwargs or {}
  101. user = request_kwargs.get("user")
  102. # If wer cannot find a user, then this method should return
  103. # is_valid == False
  104. # reason == some text for why
  105. # status == some status code, probably a 401
  106. if not user:
  107. is_valid = False
  108. reason = "No user found"
  109. status = 401
  110. else:
  111. # After finding a user, make sure the permakey matches,
  112. # or else return a bad status or some other error.
  113. # In production, both this scenario, and the above "No user found"
  114. # scenario should return an identical message and status code.
  115. # This is to prevent your application accidentally
  116. # leaking information about the existence or non-existence of users.
  117. is_valid = user.permakey == permakey
  118. reason = None if is_valid else "Permakey mismatch"
  119. status = 200 if is_valid else 401
  120.  
  121. return is_valid, status, reason
  122. else:
  123. return super()._verify(
  124. request=request,
  125. return_payload=return_payload,
  126. verify=verify,
  127. raise_missing=raise_missing,
  128. request_args=request_args,
  129. request_kwargs=request_kwargs,
  130. *args,
  131. **kwargs
  132. )
  133.  
  134. async def authenticate(self, request, *args, **kwargs):
  135. username = request.json.get("username", None)
  136. password = request.json.get("password", None)
  137.  
  138. if not username or not password:
  139. raise exceptions.AuthenticationFailed(
  140. "Missing username or password."
  141. )
  142.  
  143. # Here, you would want to try to verify the username and password.
  144. # In this example, we are simply creating a new user if it does not
  145. # exist, and then authenticating the new user.
  146. user = username_table.get(username)
  147. if not user:
  148. user = User(len(USERS) + 1, username)
  149. USERS.append(user)
  150. username_table.update({user.username: user})
  151. userid_table.update({user.user_id: user})
  152.  
  153. user.permakey = await self.generate_access_token(user)
  154.  
  155. user.last_login = datetime.utcnow().strftime("%c")
  156.  
  157. return user
  158.  
  159. async def retrieve_user(self, request, payload, *args, **kwargs):
  160. if payload:
  161. user_id = payload.get("user_id", None)
  162. return userid_table.get(user_id)
  163.  
  164. else:
  165. return None
  166.  
  167.  
  168. def my_payload_extender(payload, *args, **kwargs):
  169. user_id = payload.get("user_id", None)
  170. user = userid_table.get(user_id)
  171. payload.update({"username": user.username})
  172.  
  173. return payload
  174.  
  175.  
  176. class MyResponses(Responses):
  177. @staticmethod
  178. def extend_authenticate(
  179. request, user=None, access_token=None, refresh_token=None
  180. ):
  181. return {"permakey": user.permakey}
  182.  
  183.  
  184. class RandomClaim(Claim):
  185. """
  186. This custom claim is not necessary. It is merely being added so that everytime
  187. that await Authentication.generate_access_token() is being called, it will
  188. provide a different token. It is for illustrative purposes only
  189. """
  190.  
  191. key = "rand"
  192.  
  193. def setup(self, *args, **kwargs):
  194. return random.random()
  195.  
  196. def verify(self, *args, **kwargs):
  197. return True
  198.  
  199.  
  200. # elsewhere in the universe ...
  201. if __name__ == "__main__":
  202. app = Sanic(__name__)
  203.  
  204. sanicjwt = Initialize(
  205. app,
  206. authentication_class=MyAuthentication,
  207. configuration_class=MyConfig,
  208. # Following settings are for example purposes only
  209. responses_class=MyResponses,
  210. custom_claims=[RandomClaim],
  211. extend_payload=my_payload_extender,
  212. expiration_delta=15,
  213. leeway=0,
  214. )
  215.  
  216. @app.route("/")
  217. async def helloworld(request):
  218. return json({"hello": "world"})
  219.  
  220. @app.route("/protected")
  221. @sanicjwt.inject_user()
  222. @sanicjwt.protected()
  223. async def protected_request(request, user):
  224. return json({"protected": True})
  225.  
  226. # this route is for demonstration only
  227.  
  228. @app.route("/cache")
  229. @sanicjwt.protected()
  230. async def protected_cache(request):
  231. print(USERS)
  232. return json(userid_table)
  233.  
  234. app.run(debug=True)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement