Advertisement
kevlinsky

Untitled

Dec 9th, 2021
712
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.22 KB | None | 0 0
  1. import os
  2. import jwt
  3. from fastapi import HTTPException
  4. from passlib.context import CryptContext
  5. from datetime import datetime, timedelta
  6.  
  7.  
  8. class Auth:
  9.     hasher = CryptContext(schemes=['bcrypt'])
  10.     secret = os.getenv("APP_SECRET_STRING")
  11.  
  12.     def encode_password(self, password):
  13.         return self.hasher.hash(password)
  14.  
  15.     def verify_password(self, password, encoded_password):
  16.         return self.hasher.verify(password, encoded_password)
  17.  
  18.     def encode_token(self, email):
  19.         payload = {
  20.             'exp': datetime.utcnow() + timedelta(days=0, minutes=30),
  21.             'iat': datetime.utcnow(),
  22.             'scope': 'access_token',
  23.             'sub': email
  24.         }
  25.         return jwt.encode(
  26.             payload,
  27.             self.secret,
  28.             algorithm='HS256'
  29.         )
  30.  
  31.     def decode_token(self, token):
  32.         try:
  33.             payload = jwt.decode(token, self.secret, algorithms=['HS256'])
  34.             if payload['scope'] == 'access_token':
  35.                 return payload['sub']
  36.             raise HTTPException(status_code=401, detail='Scope for the token is invalid')
  37.         except jwt.ExpiredSignatureError:
  38.             raise HTTPException(status_code=401, detail='Token expired')
  39.         except jwt.InvalidTokenError:
  40.             raise HTTPException(status_code=401, detail='Invalid token')
  41.  
  42.     def encode_refresh_token(self, email):
  43.         payload = {
  44.             'exp': datetime.utcnow() + timedelta(days=0, hours=10),
  45.             'iat': datetime.utcnow(),
  46.             'scope': 'refresh_token',
  47.             'sub': email
  48.         }
  49.         return jwt.encode(
  50.             payload,
  51.             self.secret,
  52.             algorithm='HS256'
  53.         )
  54.  
  55.     def refresh_token(self, refresh_token):
  56.         try:
  57.             payload = jwt.decode(refresh_token, self.secret, algorithms=['HS256'])
  58.             if payload['scope'] == 'refresh_token':
  59.                 email = payload['sub']
  60.                 new_token = self.encode_token(email)
  61.                 return new_token
  62.             raise HTTPException(status_code=401, detail='Invalid scope for token')
  63.         except jwt.ExpiredSignatureError:
  64.             raise HTTPException(status_code=401, detail='Refresh token expired')
  65.         except jwt.InvalidTokenError:
  66.             raise HTTPException(status_code=401, detail='Invalid refresh token')
  67.  
  68.     def encode_confirm_token(self, email):
  69.         payload = {
  70.             'exp': datetime.utcnow() + timedelta(days=7),
  71.             'iat': datetime.utcnow(),
  72.             'scope': 'confirm_token',
  73.             'sub': email
  74.         }
  75.         return jwt.encode(
  76.             payload,
  77.             self.secret,
  78.             algorithm='HS256'
  79.         )
  80.  
  81.     def decode_confirm_token(self, token):
  82.         try:
  83.             payload = jwt.decode(token, self.secret, algorithms=['HS256'])
  84.             if payload['scope'] == 'confirm_token':
  85.                 return payload['sub']
  86.             raise HTTPException(status_code=401, detail='Scope for the token is invalid')
  87.         except jwt.ExpiredSignatureError:
  88.             raise HTTPException(status_code=401, detail='Token expired')
  89.         except jwt.InvalidTokenError:
  90.             raise HTTPException(status_code=401, detail='Invalid token')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement