kevlinsky

Untitled

Dec 9th, 2021
598
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import os
  2. import jwt
  3. from fastapi import HTTPException
  4. from passlib.context import CryptContext
  5. from datetime import datetime, timedelta
  6.  
  7.  
  8. class Auth:
  9.     hasher = CryptContext(schemes=['bcrypt'])
  10.     secret = os.getenv("APP_SECRET_STRING")
  11.  
  12.     def encode_password(self, password):
  13.         return self.hasher.hash(password)
  14.  
  15.     def verify_password(self, password, encoded_password):
  16.         return self.hasher.verify(password, encoded_password)
  17.  
  18.     def encode_token(self, email):
  19.         payload = {
  20.             'exp': datetime.utcnow() + timedelta(days=0, minutes=30),
  21.             'iat': datetime.utcnow(),
  22.             'scope': 'access_token',
  23.             'sub': email
  24.         }
  25.         return jwt.encode(
  26.             payload,
  27.             self.secret,
  28.             algorithm='HS256'
  29.         )
  30.  
  31.     def decode_token(self, token):
  32.         try:
  33.             payload = jwt.decode(token, self.secret, algorithms=['HS256'])
  34.             if payload['scope'] == 'access_token':
  35.                 return payload['sub']
  36.             raise HTTPException(status_code=401, detail='Scope for the token is invalid')
  37.         except jwt.ExpiredSignatureError:
  38.             raise HTTPException(status_code=401, detail='Token expired')
  39.         except jwt.InvalidTokenError:
  40.             raise HTTPException(status_code=401, detail='Invalid token')
  41.  
  42.     def encode_refresh_token(self, email):
  43.         payload = {
  44.             'exp': datetime.utcnow() + timedelta(days=0, hours=10),
  45.             'iat': datetime.utcnow(),
  46.             'scope': 'refresh_token',
  47.             'sub': email
  48.         }
  49.         return jwt.encode(
  50.             payload,
  51.             self.secret,
  52.             algorithm='HS256'
  53.         )
  54.  
  55.     def refresh_token(self, refresh_token):
  56.         try:
  57.             payload = jwt.decode(refresh_token, self.secret, algorithms=['HS256'])
  58.             if payload['scope'] == 'refresh_token':
  59.                 email = payload['sub']
  60.                 new_token = self.encode_token(email)
  61.                 return new_token
  62.             raise HTTPException(status_code=401, detail='Invalid scope for token')
  63.         except jwt.ExpiredSignatureError:
  64.             raise HTTPException(status_code=401, detail='Refresh token expired')
  65.         except jwt.InvalidTokenError:
  66.             raise HTTPException(status_code=401, detail='Invalid refresh token')
  67.  
  68.     def encode_confirm_token(self, email):
  69.         payload = {
  70.             'exp': datetime.utcnow() + timedelta(days=7),
  71.             'iat': datetime.utcnow(),
  72.             'scope': 'confirm_token',
  73.             'sub': email
  74.         }
  75.         return jwt.encode(
  76.             payload,
  77.             self.secret,
  78.             algorithm='HS256'
  79.         )
  80.  
  81.     def decode_confirm_token(self, token):
  82.         try:
  83.             payload = jwt.decode(token, self.secret, algorithms=['HS256'])
  84.             if payload['scope'] == 'confirm_token':
  85.                 return payload['sub']
  86.             raise HTTPException(status_code=401, detail='Scope for the token is invalid')
  87.         except jwt.ExpiredSignatureError:
  88.             raise HTTPException(status_code=401, detail='Token expired')
  89.         except jwt.InvalidTokenError:
  90.             raise HTTPException(status_code=401, detail='Invalid token')
RAW Paste Data