elyte5star

JWT Authentication

Jul 22nd, 2025
400
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.21 KB | Software | 0 0
  1. from starlette.status import (
  2.     HTTP_401_UNAUTHORIZED,
  3.     HTTP_403_FORBIDDEN,
  4.     HTTP_404_NOT_FOUND,
  5. )
  6. from starlette.requests import Request
  7. from fastapi.security.utils import get_authorization_scheme_param
  8. from fastapi.openapi.models import HTTPBearer as HTTPBearerModel
  9. from jose import JWTError, jwt
  10. import time
  11. from modules.settings.configuration import ApiConfig
  12. from modules.repository.queries.common import CommonQueries
  13. from modules.security.current_user import JWTPrincipal
  14. from fastapi.security.base import SecurityBase
  15. from fastapi.exceptions import HTTPException
  16. from pydantic import BaseModel, Field
  17.  
  18.  
  19. cfg = ApiConfig().from_toml_file().from_env_file()
  20. queries = CommonQueries(cfg)
  21.  
  22.  
  23. class JWTPrincipal(BaseModel):
  24.     user_id: str = Field(serialization_alias="userId")
  25.     username: str
  26.     email: str
  27.     active: bool
  28.     enabled: bool
  29.     roles: list[str]
  30.     admin: bool
  31.     expires: float
  32.     discount: float
  33.     token_id: str = Field(serialization_alias="tokenId")
  34.  
  35.  
  36.  
  37. class JWTBearer(SecurityBase):
  38.  
  39.     def __init__(
  40.         self,
  41.         scheme_name: str | None = None,
  42.         auto_error: bool = True,
  43.         allowed_roles: list = cfg.roles,
  44.         auth_method: str | None = None,
  45.     ) -> None:
  46.         super().__init__()
  47.         self.auto_error = auto_error
  48.         self.scheme_name = scheme_name or self.__class__.__name__
  49.         self.allowed_roles = allowed_roles
  50.         self.model = HTTPBearerModel(
  51.             description="Bearer token",
  52.         )
  53.  
  54.     async def __call__(self, request: Request) -> JWTPrincipal | None:
  55.         authorization = request.headers.get("Authorization", None)
  56.         scheme, token = get_authorization_scheme_param(authorization)
  57.         if not (authorization and scheme and token):
  58.             if self.auto_error:
  59.                 raise HTTPException(
  60.                     status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
  61.                 )
  62.             else:
  63.                 return None
  64.         if scheme.lower() != "bearer":
  65.             if self.auto_error:
  66.                 raise HTTPException(
  67.                     status_code=HTTP_401_UNAUTHORIZED,
  68.                     detail="Invalid authentication credentials",
  69.                 )
  70.             else:
  71.                 return None
  72.         if self.verify_jwt(token) is None:
  73.             if self.auto_error:
  74.                 raise HTTPException(
  75.                     status_code=HTTP_401_UNAUTHORIZED,
  76.                     detail="Invalid token or expired token.",
  77.                     headers={"WWW-Authenticate": "Bearer"},
  78.                 )
  79.             else:
  80.                 return None
  81.         current_user = await self.check_user(self.payload["userId"])
  82.         return current_user
  83.  
  84.     async def check_user(self, user_id: str) -> JWTPrincipal:
  85.         db_user = await queries.find_user_by_id(user_id)
  86.         if db_user is None:
  87.             raise HTTPException(
  88.                 status_code=HTTP_404_NOT_FOUND,
  89.                 detail="User session not found",
  90.             )
  91.         roles = ["USER"] if not db_user.admin else ["USER", "ADMIN"]
  92.         matches = set(self.allowed_roles).intersection(set(roles))
  93.         if len(matches) == 0:
  94.             raise HTTPException(
  95.                 status_code=HTTP_403_FORBIDDEN, detail="Not enough permissions"
  96.             )
  97.         current_user = JWTPrincipal(
  98.             user_id=self.payload["userId"],
  99.             email=self.payload["email"],
  100.             username=self.payload["sub"],
  101.             active=self.payload["active"],
  102.             enabled=self.payload["enabled"],
  103.             expires=self.payload["exp"],
  104.             admin=self.payload["admin"],
  105.             roles=self.payload["roles"],
  106.             discount=self.payload["discount"],
  107.             token_id=self.payload["jti"],
  108.         )
  109.         return current_user
  110.  
  111.     def verify_jwt(self, token: str) -> dict | None:
  112.         if token is None:
  113.             return None
  114.         try:
  115.             self.payload = jwt.decode(
  116.                 token,
  117.                 cfg.secret_key,
  118.                 algorithms=[cfg.algorithm],
  119.             )
  120.             return self.payload if self.payload["exp"] >= time.time() else None
  121.         except JWTError:
  122.             return None
  123.  
Advertisement
Add Comment
Please, Sign In to add comment