SHARE
TWEET

Untitled

a guest Aug 25th, 2019 71 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. """This file is used for websocket authentication"""
  2. import base64
  3. from functools import wraps
  4.  
  5. import jwt
  6. from decouple import config
  7. from django.conf import settings
  8. from django.db import close_old_connections
  9.  
  10. from api.helpers.response_messages import auth_messages
  11. from api.models import User
  12.  
  13.  
  14. def authenticate_user(func):
  15.     @wraps(func)
  16.     def authenticate(self, content, **kwargs):
  17.         user = self.scope.get("user")
  18.         if not user or not user.id:
  19.             authenticate_token(self, content)
  20.             error = self.scope.get("error")
  21.             if error:
  22.                 self.send_json(error)
  23.                 self.close()
  24.  
  25.         func(self, content, **kwargs)
  26.  
  27.     return authenticate
  28.  
  29.  
  30. def avoid_sending_data_user(func):
  31.     @wraps(func)
  32.     def check_authenticate(self, *args, **kwargs):
  33.         user = self.scope.get("user")
  34.         if not user or not user.id:
  35.             return
  36.         else:
  37.             func(self, *args, **kwargs)
  38.  
  39.     return check_authenticate
  40.  
  41.  
  42. def authenticate_token(scope, content):
  43.     token = None if not content else content.get("jwt_token")
  44.     if not token:
  45.         scope["error"] = auth_messages["token_required"]
  46.     else:
  47.         if settings.TESTING:
  48.             public_key = base64.b64decode(
  49.                 config('PUBLIC_KEY_TESTING')).decode("utf-8")
  50.         else:
  51.             public_key = base64.b64decode(
  52.                 config('PUBLIC_KEY')).decode("utf-8")
  53.  
  54.         try:
  55.             payload = jwt.decode(token,
  56.                                  public_key,
  57.                                  algorithms=['RS256'],
  58.                                  options={'verify_aud': False})
  59.             user = User.objects.only("id", "email", "name").get(email=payload["UserInfo"]["email"])
  60.             scope["user"] = user
  61.  
  62.         except jwt.exceptions.ExpiredSignatureError:
  63.             scope["error"] = auth_messages["expired_token"]
  64.         except jwt.exceptions.InvalidTokenError:
  65.             scope["error"] = auth_messages["invalid_token"]
  66.         except User.DoesNotExist:
  67.             scope["error"] = auth_messages["not_found"]
  68.  
  69.  
  70. class TokenAuthenticationMiddleware:
  71.     def __init__(self, inner):
  72.         self.inner = inner
  73.  
  74.     def __call__(self, scope):
  75.         close_old_connections()
  76.         headers = set(map(lambda x: x[0], scope['headers']))
  77.         if b'sec-websocket-protocol' in headers:
  78.             token = dict(scope['headers']).get(b'sec-websocket-protocol')
  79.             authenticate_token(scope, {"jwt_token": token})
  80.             # if token:
  81.             #     scope['user'] = 'Authenticated'
  82.             # else:
  83.             #     scope['user'] = None
  84.         else:
  85.             scope['user'] = None
  86.         return self.inner(scope)
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top