Advertisement
Guest User

Untitled

a guest
Aug 25th, 2019
98
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.64 KB | None | 0 0
  1. """This file is used for websocket authentication"""
  2. import base64
  3. from functools import wraps
  4.  
  5. import jwt
  6. from decouple import config
  7. from django.conf import settings
  8. from django.db import close_old_connections
  9.  
  10. from api.helpers.response_messages import auth_messages
  11. from api.models import User
  12.  
  13.  
  14. def authenticate_user(func):
  15. @wraps(func)
  16. def authenticate(self, content, **kwargs):
  17. user = self.scope.get("user")
  18. if not user or not user.id:
  19. authenticate_token(self, content)
  20. error = self.scope.get("error")
  21. if error:
  22. self.send_json(error)
  23. self.close()
  24.  
  25. func(self, content, **kwargs)
  26.  
  27. return authenticate
  28.  
  29.  
  30. def avoid_sending_data_user(func):
  31. @wraps(func)
  32. def check_authenticate(self, *args, **kwargs):
  33. user = self.scope.get("user")
  34. if not user or not user.id:
  35. return
  36. else:
  37. func(self, *args, **kwargs)
  38.  
  39. return check_authenticate
  40.  
  41.  
  42. def authenticate_token(scope, content):
  43. token = None if not content else content.get("jwt_token")
  44. if not token:
  45. scope["error"] = auth_messages["token_required"]
  46. else:
  47. if settings.TESTING:
  48. public_key = base64.b64decode(
  49. config('PUBLIC_KEY_TESTING')).decode("utf-8")
  50. else:
  51. public_key = base64.b64decode(
  52. config('PUBLIC_KEY')).decode("utf-8")
  53.  
  54. try:
  55. payload = jwt.decode(token,
  56. public_key,
  57. algorithms=['RS256'],
  58. options={'verify_aud': False})
  59. user = User.objects.only("id", "email", "name").get(email=payload["UserInfo"]["email"])
  60. scope["user"] = user
  61.  
  62. except jwt.exceptions.ExpiredSignatureError:
  63. scope["error"] = auth_messages["expired_token"]
  64. except jwt.exceptions.InvalidTokenError:
  65. scope["error"] = auth_messages["invalid_token"]
  66. except User.DoesNotExist:
  67. scope["error"] = auth_messages["not_found"]
  68.  
  69.  
  70. class TokenAuthenticationMiddleware:
  71. def __init__(self, inner):
  72. self.inner = inner
  73.  
  74. def __call__(self, scope):
  75. close_old_connections()
  76. headers = set(map(lambda x: x[0], scope['headers']))
  77. if b'sec-websocket-protocol' in headers:
  78. token = dict(scope['headers']).get(b'sec-websocket-protocol')
  79. authenticate_token(scope, {"jwt_token": token})
  80. # if token:
  81. # scope['user'] = 'Authenticated'
  82. # else:
  83. # scope['user'] = None
  84. else:
  85. scope['user'] = None
  86. return self.inner(scope)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement