Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from fastapi import FastAPI, Depends, HTTPException, status
- from pydantic import BaseModel
- from typing import Optional
- import models
- import bcrypt
- # from passlib.context import CryptContext - passlib abandoned
- from sqlalchemy.orm import Session
- from database import SessionLocal, engine
- from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
- from datetime import datetime, timedelta
- from jose import jwt, JWTError
- SECRET_KEY = "ABCDEFGHIJKLMNOPQRSTUVWXYZabc1234567890"
- ALGORITHM = "HS256"
- class CreateUser(BaseModel):
- username: str
- email: Optional[str]
- first_name: str
- last_name: str
- password: str
- # bcrypt_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- models.Base.metadata.create_all(bind=engine)
- oauth2_bearer = OAuth2PasswordBearer(tokenUrl="token")
- app = FastAPI()
- def get_db():
- try:
- db = SessionLocal()
- yield db
- finally:
- db.close()
- def get_password_hash(password):
- # return bcrypt_context.hash(password)
- pwd_bytes = password.encode('utf-8')
- salt = bcrypt.gensalt()
- hashed_password = bcrypt.hashpw(password=pwd_bytes, salt=salt)
- return hashed_password
- def verify_password(plain_password, hashed_password):
- password_byte_enc = plain_password.encode('utf-8')
- return bcrypt.checkpw(password=password_byte_enc, hashed_password=hashed_password)
- def authenticate_user(username: str, password: str, db):
- user = db.query(models.Users)\
- .filter(models.Users.username == username)\
- .first()
- if not user:
- return False
- if not verify_password(password, user.hashed_password):
- return False
- return user
- def create_access_token(username: str, user_id: int, expires_delta: Optional[timedelta] = None):
- encode = {"sub": username, "id": user_id}
- if expires_delta:
- expire = datetime.utcnow() + expires_delta
- else:
- expire = datetime.utcnow() + timedelta(minutes=15)
- encode.update({"exp": expire})
- return jwt.encode(encode, SECRET_KEY, algorithm=ALGORITHM)
- async def get_current_user(token: str = Depends(oauth2_bearer)):
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get('sub')
- user_id: int = payload.get('id')
- if username is None or user_id is None:
- raise get_user_exception()
- return {"username": username, "id": user_id}
- except JWTError:
- raise get_user_exception()
- @app.post("/create/user")
- async def create_new_user(create_user: CreateUser, db: Session = Depends(get_db)):
- create_user_model = models.Users()
- create_user_model.email = create_user.email
- create_user_model.username = create_user.username
- create_user_model.first_name = create_user.first_name
- create_user_model.last_name = create_user.last_name
- hash_password = get_password_hash(create_user.password)
- create_user_model.hashed_password = hash_password
- create_user_model.is_active = True
- db.add(create_user_model)
- db.commit()
- @app.post("/token")
- async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(),
- db: Session = Depends(get_db)):
- user = authenticate_user(form_data.username, form_data.password, db)
- if not user:
- raise token_exception()
- token_expires = timedelta(minutes=20)
- token = create_access_token(
- user.username, user.id, expires_delta=token_expires)
- return {"token": token}
- # Exceptions
- def get_user_exception():
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"www-authenticate": "Bearer"},
- )
- return credentials_exception
- def token_exception():
- token_exception_response = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Incorrect username or password",
- headers={"www-authenticate": "Bearer"},
- )
- return token_exception_response
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement