Not a member of Pastebin yet?
                        Sign Up,
                        it unlocks many cool features!                    
                - from fastapi import FastAPI, Depends, HTTPException, status
 - from pydantic import BaseModel
 - from typing import Optional
 - import models
 - import bcrypt
 - # from passlib.context import CryptContext - passlib abandoned
 - from sqlalchemy.orm import Session
 - from database import SessionLocal, engine
 - from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
 - from datetime import datetime, timedelta
 - from jose import jwt, JWTError
 - SECRET_KEY = "ABCDEFGHIJKLMNOPQRSTUVWXYZabc1234567890"
 - ALGORITHM = "HS256"
 - class CreateUser(BaseModel):
 - username: str
 - email: Optional[str]
 - first_name: str
 - last_name: str
 - password: str
 - # bcrypt_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
 - models.Base.metadata.create_all(bind=engine)
 - oauth2_bearer = OAuth2PasswordBearer(tokenUrl="token")
 - app = FastAPI()
 - def get_db():
 - try:
 - db = SessionLocal()
 - yield db
 - finally:
 - db.close()
 - def get_password_hash(password):
 - # return bcrypt_context.hash(password)
 - pwd_bytes = password.encode('utf-8')
 - salt = bcrypt.gensalt()
 - hashed_password = bcrypt.hashpw(password=pwd_bytes, salt=salt)
 - return hashed_password
 - def verify_password(plain_password, hashed_password):
 - password_byte_enc = plain_password.encode('utf-8')
 - return bcrypt.checkpw(password=password_byte_enc, hashed_password=hashed_password)
 - def authenticate_user(username: str, password: str, db):
 - user = db.query(models.Users)\
 - .filter(models.Users.username == username)\
 - .first()
 - if not user:
 - return False
 - if not verify_password(password, user.hashed_password):
 - return False
 - return user
 - def create_access_token(username: str, user_id: int, expires_delta: Optional[timedelta] = None):
 - encode = {"sub": username, "id": user_id}
 - if expires_delta:
 - expire = datetime.utcnow() + expires_delta
 - else:
 - expire = datetime.utcnow() + timedelta(minutes=15)
 - encode.update({"exp": expire})
 - return jwt.encode(encode, SECRET_KEY, algorithm=ALGORITHM)
 - async def get_current_user(token: str = Depends(oauth2_bearer)):
 - try:
 - payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
 - username: str = payload.get('sub')
 - user_id: int = payload.get('id')
 - if username is None or user_id is None:
 - raise get_user_exception()
 - return {"username": username, "id": user_id}
 - except JWTError:
 - raise get_user_exception()
 - @app.post("/create/user")
 - async def create_new_user(create_user: CreateUser, db: Session = Depends(get_db)):
 - create_user_model = models.Users()
 - create_user_model.email = create_user.email
 - create_user_model.username = create_user.username
 - create_user_model.first_name = create_user.first_name
 - create_user_model.last_name = create_user.last_name
 - hash_password = get_password_hash(create_user.password)
 - create_user_model.hashed_password = hash_password
 - create_user_model.is_active = True
 - db.add(create_user_model)
 - db.commit()
 - @app.post("/token")
 - async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(),
 - db: Session = Depends(get_db)):
 - user = authenticate_user(form_data.username, form_data.password, db)
 - if not user:
 - raise token_exception()
 - token_expires = timedelta(minutes=20)
 - token = create_access_token(
 - user.username, user.id, expires_delta=token_expires)
 - return {"token": token}
 - # Exceptions
 - def get_user_exception():
 - credentials_exception = HTTPException(
 - status_code=status.HTTP_401_UNAUTHORIZED,
 - detail="Could not validate credentials",
 - headers={"www-authenticate": "Bearer"},
 - )
 - return credentials_exception
 - def token_exception():
 - token_exception_response = HTTPException(
 - status_code=status.HTTP_401_UNAUTHORIZED,
 - detail="Incorrect username or password",
 - headers={"www-authenticate": "Bearer"},
 - )
 - return token_exception_response
 
Advertisement
 
                    Add Comment                
                
                        Please, Sign In to add comment