Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """This file is used for websocket authentication"""
- import base64
- from functools import wraps
- import jwt
- from decouple import config
- from django.conf import settings
- from django.db import close_old_connections
- from api.helpers.response_messages import auth_messages
- from api.models import User
- def authenticate_user(func):
- @wraps(func)
- def authenticate(self, content, **kwargs):
- user = self.scope.get("user")
- if not user or not user.id:
- authenticate_token(self, content)
- error = self.scope.get("error")
- if error:
- self.send_json(error)
- self.close()
- func(self, content, **kwargs)
- return authenticate
- def avoid_sending_data_user(func):
- @wraps(func)
- def check_authenticate(self, *args, **kwargs):
- user = self.scope.get("user")
- if not user or not user.id:
- return
- else:
- func(self, *args, **kwargs)
- return check_authenticate
- def authenticate_token(scope, content):
- token = None if not content else content.get("jwt_token")
- if not token:
- scope["error"] = auth_messages["token_required"]
- else:
- if settings.TESTING:
- public_key = base64.b64decode(
- config('PUBLIC_KEY_TESTING')).decode("utf-8")
- else:
- public_key = base64.b64decode(
- config('PUBLIC_KEY')).decode("utf-8")
- try:
- payload = jwt.decode(token,
- public_key,
- algorithms=['RS256'],
- options={'verify_aud': False})
- user = User.objects.only("id", "email", "name").get(email=payload["UserInfo"]["email"])
- scope["user"] = user
- except jwt.exceptions.ExpiredSignatureError:
- scope["error"] = auth_messages["expired_token"]
- except jwt.exceptions.InvalidTokenError:
- scope["error"] = auth_messages["invalid_token"]
- except User.DoesNotExist:
- scope["error"] = auth_messages["not_found"]
- class TokenAuthenticationMiddleware:
- def __init__(self, inner):
- self.inner = inner
- def __call__(self, scope):
- close_old_connections()
- headers = set(map(lambda x: x[0], scope['headers']))
- if b'sec-websocket-protocol' in headers:
- token = dict(scope['headers']).get(b'sec-websocket-protocol')
- authenticate_token(scope, {"jwt_token": token})
- # if token:
- # scope['user'] = 'Authenticated'
- # else:
- # scope['user'] = None
- else:
- scope['user'] = None
- return self.inner(scope)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement