Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from django.conf import settings
- from firebase_admin import auth as firebase_auth
- from django.utils.encoding import smart_text
- from django.utils import timezone
- from django.contrib.auth import get_user_model
- from django.contrib.auth.models import AnonymousUser
- from rest_framework import (
- authentication,
- exceptions
- )
- from rest_framework.settings import api_settings
- from django.utils.translation import ugettext_lazy as _
- User = get_user_model()
- class BaseFirebaseAuthentication(authentication.BaseAuthentication):
- """
- Token based authentication using firebase.
- """
- def authenticate(self, request):
- """
- With ALLOW_ANONYMOUS_REQUESTS, set request.user to an AnonymousUser,
- allowing us to configure access at the permissions level.
- """
- authorization_header = authentication.get_authorization_header(request)
- if getattr(api_settings, 'ALLOW_ANONYMOUS_REQUESTS', False) and not authorization_header:
- return AnonymousUser(), None
- """
- Returns a tuple of len(2) of `User` and the decoded firebase token if
- a valid signature has been supplied using Firebase authentication.
- """
- firebase_token = self.get_token(request)
- decoded_token = self.decode_token(firebase_token)
- uid = self.authenticate_token(decoded_token)
- local_user = self.get_local_user(uid)
- return local_user, decoded_token
- def get_token(self, request):
- raise NotImplementedError('get_token() has not been implemented.')
- def decode_token(self, firebase_token):
- raise NotImplementedError('decode_token() has not been implemented.')
- def authenticate_token(self, decoded_token):
- raise NotImplementedError('authenticate_token() has not been implemented.')
- def get_local_user(self, firebase_user):
- raise NotImplementedError('get_or_create_local_user() has not been implemented.')
- class FirebaseAuthentication(BaseFirebaseAuthentication):
- """
- Clients should authenticate by passing the token key in the
- 'Authorization' HTTP header, prepended with the string specified in the
- settings.FIREBASE_AUTH_HEADER_PREFIX setting (Default = 'Token')
- """
- www_authenticate_realm = 'api'
- def get_token(self, request):
- """
- Parse Authorization header and retrieve JWT
- """
- authorization_header = \
- authentication.get_authorization_header(request).split()
- auth_header_prefix = settings.FIREBASE_AUTH_HEADER_PREFIX.lower()
- if not authorization_header or len(authorization_header) != 2:
- raise exceptions.AuthenticationFailed(
- _('Invalid Authorization header format, expecting: JWT <token>.')
- )
- if smart_text(authorization_header[0].lower()) != auth_header_prefix:
- raise exceptions.AuthenticationFailed(
- _('Invalid Authorization header prefix, expecting: JWT.')
- )
- return authorization_header[1]
- def decode_token(self, firebase_token):
- """
- Attempt to verify JWT from Authorization header with Firebase and
- return the decoded token
- """
- try:
- return firebase_auth.verify_id_token(
- firebase_token,
- check_revoked=True
- )
- except ValueError as exc:
- raise exceptions.AuthenticationFailed(
- _('JWT was found to be invalid, or the App’s project ID cannot be determined.')
- )
- except firebase_auth.AuthError as exc:
- if exc.code == 'ID_TOKEN_REVOKED':
- raise exceptions.AuthenticationFailed(
- _('Token revoked, inform the user to reauthenticate or signOut().')
- )
- else:
- raise exceptions.AuthenticationFailed(
- _('Token is invalid.')
- )
- def authenticate_token(self, decoded_token):
- """
- Returns firebase user uid if token is authenticated
- """
- try:
- uid = decoded_token.get('uid')
- return uid
- except ValueError:
- raise exceptions.AuthenticationFailed(
- _('User ID is None, empty or malformed')
- )
- except firebase_auth.AuthError:
- raise exceptions.AuthenticationFailed(
- _('Error retrieving the user, or the specified user ID does not exist')
- )
- def get_local_user(self, uid):
- try:
- user = User.objects.get(username=uid)
- if not user.is_active:
- raise exceptions.AuthenticationFailed(
- _('User account is not currently active.')
- )
- user.last_login = timezone.now()
- user.save()
- return user
- except User.DoesNotExist:
- raise exceptions.AuthenticationFailed(_('Invalid token.'))
- def authenticate_header(self, request):
- """
- Return a string to be used as the value of the `WWW-Authenticate`
- header in a `401 Unauthenticated` response, or `None` if the
- authentication scheme should return `403 Permission Denied` responses.
- """
- auth_header_prefix = settings.FIREBASE_AUTH_HEADER_PREFIX.lower()
- return '{} realm="{}"'.format(auth_header_prefix, self.www_authenticate_realm)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement