Advertisement
k1dr0ck

fastapi with bcrypt

Jan 28th, 2024
325
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.08 KB | None | 0 0
  1. from fastapi import FastAPI, Depends, HTTPException, status
  2. from pydantic import BaseModel
  3. from typing import Optional
  4. import models
  5. import bcrypt
  6. # from passlib.context import CryptContext - passlib abandoned
  7. from sqlalchemy.orm import Session
  8. from database import SessionLocal, engine
  9. from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
  10. from datetime import datetime, timedelta
  11. from jose import jwt, JWTError
  12.  
  13.  
  14. SECRET_KEY = "ABCDEFGHIJKLMNOPQRSTUVWXYZabc1234567890"
  15. ALGORITHM = "HS256"
  16.  
  17.  
  18. class CreateUser(BaseModel):
  19.     username: str
  20.     email: Optional[str]
  21.     first_name: str
  22.     last_name: str
  23.     password: str
  24.  
  25.  
  26. # bcrypt_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  27. models.Base.metadata.create_all(bind=engine)
  28.  
  29. oauth2_bearer = OAuth2PasswordBearer(tokenUrl="token")
  30.  
  31.  
  32. app = FastAPI()
  33.  
  34.  
  35. def get_db():
  36.     try:
  37.         db = SessionLocal()
  38.         yield db
  39.     finally:
  40.         db.close()
  41.  
  42.  
  43. def get_password_hash(password):
  44.     # return bcrypt_context.hash(password)
  45.     pwd_bytes = password.encode('utf-8')
  46.     salt = bcrypt.gensalt()
  47.     hashed_password = bcrypt.hashpw(password=pwd_bytes, salt=salt)
  48.     return hashed_password
  49.  
  50.  
  51. def verify_password(plain_password, hashed_password):
  52.     password_byte_enc = plain_password.encode('utf-8')
  53.     return bcrypt.checkpw(password=password_byte_enc, hashed_password=hashed_password)
  54.  
  55.  
  56. def authenticate_user(username: str, password: str, db):
  57.     user = db.query(models.Users)\
  58.         .filter(models.Users.username == username)\
  59.         .first()
  60.     if not user:
  61.         return False
  62.     if not verify_password(password, user.hashed_password):
  63.         return False
  64.     return user
  65.  
  66.  
  67. def create_access_token(username: str, user_id: int, expires_delta: Optional[timedelta] = None):
  68.     encode = {"sub": username, "id": user_id}
  69.     if expires_delta:
  70.         expire = datetime.utcnow() + expires_delta
  71.     else:
  72.         expire = datetime.utcnow() + timedelta(minutes=15)
  73.     encode.update({"exp": expire})
  74.     return jwt.encode(encode, SECRET_KEY, algorithm=ALGORITHM)
  75.  
  76.  
  77. async def get_current_user(token: str = Depends(oauth2_bearer)):
  78.     try:
  79.         payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  80.         username: str = payload.get('sub')
  81.         user_id: int = payload.get('id')
  82.         if username is None or user_id is None:
  83.             raise get_user_exception()
  84.         return {"username": username, "id": user_id}
  85.     except JWTError:
  86.         raise get_user_exception()
  87.  
  88.  
  89. @app.post("/create/user")
  90. async def create_new_user(create_user: CreateUser, db: Session = Depends(get_db)):
  91.     create_user_model = models.Users()
  92.     create_user_model.email = create_user.email
  93.     create_user_model.username = create_user.username
  94.     create_user_model.first_name = create_user.first_name
  95.     create_user_model.last_name = create_user.last_name
  96.     hash_password = get_password_hash(create_user.password)
  97.     create_user_model.hashed_password = hash_password
  98.     create_user_model.is_active = True
  99.  
  100.     db.add(create_user_model)
  101.     db.commit()
  102.  
  103.  
  104. @app.post("/token")
  105. async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(),
  106.                                  db: Session = Depends(get_db)):
  107.     user = authenticate_user(form_data.username, form_data.password, db)
  108.     if not user:
  109.         raise token_exception()
  110.     token_expires = timedelta(minutes=20)
  111.     token = create_access_token(
  112.         user.username, user.id, expires_delta=token_expires)
  113.     return {"token": token}
  114.  
  115. # Exceptions
  116.  
  117.  
  118. def get_user_exception():
  119.     credentials_exception = HTTPException(
  120.         status_code=status.HTTP_401_UNAUTHORIZED,
  121.         detail="Could not validate credentials",
  122.         headers={"www-authenticate": "Bearer"},
  123.     )
  124.     return credentials_exception
  125.  
  126.  
  127. def token_exception():
  128.     token_exception_response = HTTPException(
  129.         status_code=status.HTTP_401_UNAUTHORIZED,
  130.         detail="Incorrect username or password",
  131.         headers={"www-authenticate": "Bearer"},
  132.     )
  133.     return token_exception_response
  134.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement