Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import jwt
- from fastapi import HTTPException
- from passlib.context import CryptContext
- from datetime import datetime, timedelta
- class Auth:
- hasher = CryptContext(schemes=['bcrypt'])
- secret = os.getenv("APP_SECRET_STRING")
- def encode_password(self, password):
- return self.hasher.hash(password)
- def verify_password(self, password, encoded_password):
- return self.hasher.verify(password, encoded_password)
- def encode_token(self, email):
- payload = {
- 'exp': datetime.utcnow() + timedelta(days=0, minutes=30),
- 'iat': datetime.utcnow(),
- 'scope': 'access_token',
- 'sub': email
- }
- return jwt.encode(
- payload,
- self.secret,
- algorithm='HS256'
- )
- def decode_token(self, token):
- try:
- payload = jwt.decode(token, self.secret, algorithms=['HS256'])
- if payload['scope'] == 'access_token':
- return payload['sub']
- raise HTTPException(status_code=401, detail='Scope for the token is invalid')
- except jwt.ExpiredSignatureError:
- raise HTTPException(status_code=401, detail='Token expired')
- except jwt.InvalidTokenError:
- raise HTTPException(status_code=401, detail='Invalid token')
- def encode_refresh_token(self, email):
- payload = {
- 'exp': datetime.utcnow() + timedelta(days=0, hours=10),
- 'iat': datetime.utcnow(),
- 'scope': 'refresh_token',
- 'sub': email
- }
- return jwt.encode(
- payload,
- self.secret,
- algorithm='HS256'
- )
- def refresh_token(self, refresh_token):
- try:
- payload = jwt.decode(refresh_token, self.secret, algorithms=['HS256'])
- if payload['scope'] == 'refresh_token':
- email = payload['sub']
- new_token = self.encode_token(email)
- return new_token
- raise HTTPException(status_code=401, detail='Invalid scope for token')
- except jwt.ExpiredSignatureError:
- raise HTTPException(status_code=401, detail='Refresh token expired')
- except jwt.InvalidTokenError:
- raise HTTPException(status_code=401, detail='Invalid refresh token')
- def encode_confirm_token(self, email):
- payload = {
- 'exp': datetime.utcnow() + timedelta(days=7),
- 'iat': datetime.utcnow(),
- 'scope': 'confirm_token',
- 'sub': email
- }
- return jwt.encode(
- payload,
- self.secret,
- algorithm='HS256'
- )
- def decode_confirm_token(self, token):
- try:
- payload = jwt.decode(token, self.secret, algorithms=['HS256'])
- if payload['scope'] == 'confirm_token':
- return payload['sub']
- raise HTTPException(status_code=401, detail='Scope for the token is invalid')
- except jwt.ExpiredSignatureError:
- raise HTTPException(status_code=401, detail='Token expired')
- except jwt.InvalidTokenError:
- raise HTTPException(status_code=401, detail='Invalid token')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement