Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from os import environ
- from apppack.models import app
- if __name__ == '__main__':
- app.run()
- import jwt
- import datetime
- import app, db, bcrypt
- import config
- class User(db.Model):
- """ User Model for storing user related details """
- __tablename__ = "ConfuUsers"
- #__abstract__ = True
- #id = db.Column(db.Integer, primary_key=True)
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- email = db.Column(db.String(255), unique=True, nullable=False)
- password = db.Column(db.String(255), nullable=False)
- registered_on = db.Column(db.DateTime, nullable=False)
- admin = db.Column(db.Boolean, nullable=False, default=False)
- def __init__(self, email, password, admin=False):
- self.email = email
- self.password = bcrypt.generate_password_hash(
- password, app.config.get('BCRYPT_LOG_ROUNDS')
- ).decode()
- self.registered_on = datetime.datetime.now()
- self.admin = admin
- def encode_auth_token(self, user_id):
- """
- Generates the Auth Token
- :return: string
- """
- try:
- payload = {
- 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1, seconds=500),
- 'iat': datetime.datetime.utcnow(),
- 'sub': user_id
- }
- return jwt.encode(
- payload,
- app.config.get('SECRET_KEY'),
- algorithm='HS256'
- )
- except Exception as e:
- return e
- @staticmethod
- def decode_auth_token(auth_token):
- """
- Validates the auth token
- :param auth_token:
- :return: integer|string
- """
- try:
- payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'))
- is_blacklisted_token = BlacklistToken.check_blacklist(auth_token)
- if is_blacklisted_token:
- return 'Token blacklisted. Please log in again.'
- else:
- return payload['sub']
- except jwt.ExpiredSignatureError:
- return 'Signature expired. Please log in again.'
- except jwt.InvalidTokenError:
- return 'Invalid token. Please log in again.'
- class BlacklistToken(db.Model):
- """
- Token Model for storing JWT tokens
- """
- __tablename__ = 'blacklist_tokens'
- # __abstract__ = True
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- token = db.Column(db.String(500), unique=True, nullable=False)
- blacklisted_on = db.Column(db.DateTime, nullable=False)
- def __init__(self, token):
- self.token = token
- self.blacklisted_on = datetime.datetime.now()
- def __repr__(self):
- return '<id: token: {}'.format(self.token)
- @staticmethod
- def check_blacklist(auth_token):
- # check whether auth token has been blacklisted
- res = BlacklistToken.query.filter_by(token=str(auth_token)).first()
- if res:
- return True
- else:
- return False
- from flask import Blueprint, request, make_response, jsonify
- from flask.views import MethodView
- import bcrypt, db
- from apppack.models import User, BlacklistToken
- auth_blueprint = Blueprint('auth', __name__)
- class RegisterAPI(MethodView):
- """
- User Registration Resource
- """
- def post(self):
- # get the post data
- post_data = request.get_json()
- # check if user already exists
- user = User.query.filter_by(email=post_data.get('email')).all()
- if not user:
- try:
- user = User(
- email=post_data.get('email'),
- password=post_data.get('password')
- )
- # insert the user
- db.session.add(user)
- db.session.commit()
- # generate the auth token
- auth_token = user.encode_auth_token(user.id)
- responseObject = {
- 'status': 'success',
- 'message': 'Successfully registered.',
- 'auth_token': auth_token.decode()
- }
- return make_response(jsonify(responseObject)), 201
- except Exception as e:
- responseObject = {
- 'status': 'fail',
- 'message': 'Some error occurred. Please try again.'
- }
- return make_response(jsonify(responseObject)), 401
- else:
- responseObject = {
- 'status': 'fail',
- 'message': 'User already exists. Please Log in.',
- }
- return make_response(jsonify(responseObject)), 202
- #-------------------------------------------------------------------- test
- #post_data = request.get_json()
- ## check if user already exists
- #user = User(
- # email=post_data.get('email'),
- # password=post_data.get('password'),
- # )
- # # insert the user
- #db.session.add(user)
- #db.session.commit()
- #responseObject = {
- # 'status': 'success',
- # 'message': 'Successfully registered.',
- # 'auth_token': post_data.get('email')
- # }
- #return make_response(jsonify(responseObject)), 201
- #-----------------------------------------------------------------------test
- class LoginAPI(MethodView):
- """
- User Login Resource
- """
- def post(self):
- # get the post data
- post_data = request.get_json()
- try:
- # fetch the user data
- user = User.query.filter_by(
- email=post_data.get('email')
- ).first()
- if user and bcrypt.check_password_hash(
- user.password, post_data.get('password')
- ):
- auth_token = user.encode_auth_token(user.id)
- if auth_token:
- responseObject = {
- 'status': 'success',
- 'message': 'Successfully logged in.',
- 'auth_token': auth_token.decode()
- }
- return make_response(jsonify(responseObject)), 200
- else:
- responseObject = {
- 'status': 'fail',
- 'message': 'User does not exist.'
- }
- return make_response(jsonify(responseObject)), 404
- except Exception as e:
- print(e)
- responseObject = {
- 'status': 'fail',
- 'message': 'Try again'
- }
- return make_response(jsonify(responseObject)), 500
- class UserAPI(MethodView):
- """
- User Resource
- """
- def get(self):
- # get the auth token
- auth_header = request.headers.get('Authorization')
- if auth_header:
- try:
- auth_token = auth_header
- #auth_token = auth_header.split(" ")[1]
- except indexerror:
- responseobject = {
- 'status': 'fail',
- 'message': 'bearer token malformed.'
- }
- return make_response(jsonify(responseobject)), 401
- else:
- auth_token = ''
- if auth_token:
- resp = User.decode_auth_token(auth_token)
- if not isinstance(resp, str):
- user = User.query.filter_by(id=resp).first()
- responseObject = {
- 'status': 'success',
- 'data': {
- 'user_id': user.id,
- 'email': user.email,
- 'admin': user.admin,
- 'registered_on': user.registered_on
- }
- }
- return make_response(jsonify(responseObject)), 200
- responseObject = {
- 'status': 'fail',
- 'message': resp
- }
- return make_response(jsonify(responseObject)), 401
- else:
- responseObject = {
- 'status': 'fail',
- 'message': 'Provide a valid auth token.'
- }
- return make_response(jsonify(responseObject)), 401
- class LogoutAPI(MethodView):
- """
- Logout Resource
- """
- def post(self):
- # get auth token
- auth_header = request.headers.get('Authorization')
- if auth_header:
- auth_token = auth_header
- #auth_token = auth_header.split(" ")
- #auth_token = auth_header.split(" ")[1]
- else:
- auth_token = ''
- if auth_token:
- resp = User.decode_auth_token(auth_token)
- if not isinstance(resp, str):
- # mark the token as blacklisted
- blacklist_token = BlacklistToken(token=auth_token)
- try:
- # insert the token
- db.session.add(blacklist_token)
- db.session.commit()
- responseObject = {
- 'status': 'success',
- 'message': 'Successfully logged out.'
- }
- return make_response(jsonify(responseObject)), 200
- except Exception as e:
- responseObject = {
- 'status': 'fail',
- 'message': e
- }
- return make_response(jsonify(responseObject)), 200
- else:
- responseObject = {
- 'status': 'fail',
- 'message': resp
- }
- return make_response(jsonify(responseObject)), 401
- else:
- responseObject = {
- 'status': 'fail',
- 'message': 'Provide a valid auth token.'
- }
- return make_response(jsonify(responseObject)), 403
- # define the API resources
- registration_view = RegisterAPI.as_view('register_api')
- login_view = LoginAPI.as_view('login_api')
- user_view = UserAPI.as_view('user_api')
- logout_view = LogoutAPI.as_view('logout_api')
- # add Rules for API Endpoints
- auth_blueprint.add_url_rule(
- '/auth/register',
- view_func=registration_view,
- methods=['POST']
- )
- auth_blueprint.add_url_rule(
- '/auth/login',
- view_func=login_view,
- methods=['POST']
- )
- auth_blueprint.add_url_rule(
- '/auth/status',
- view_func=user_view,
- methods=['GET']
- )
- auth_blueprint.add_url_rule(
- '/auth/logout',
- view_func=logout_view,
- methods=['POST']
- )
- import os
- from flask import Flask
- from flask_bcrypt import Bcrypt
- from flask_sqlalchemy import SQLAlchemy
- from flask_cors import CORS
- from apppack.views import auth_blueprint
- application = Flask(__name__)
- application.config['SECRET_KEY'] = 'yn=xc5xfanBxb8tnx83xbefx8axe3xddEx17x06xc9x96x8ec|'
- CORS(application)
- app_settings = os.getenv(
- 'APP_SETTINGS',
- '.config.DevelopmentConfig'
- )
- app.config.from_object(app_settings)
- bcrypt = Bcrypt(application)
- db = SQLAlchemy(application)
- application.register_blueprint(auth_blueprint)
- import os
- basedir = os.path.abspath(os.path.dirname(__file__))
- postgres_local_base = 'postgresql://confudb:123123123!@confudb.cusmbtiketg6.ap-south-1.rds.amazonaws.com/'
- database_name = 'confudb'
- #'postgresql://postgres:admin@localhost/'
- #'flask_jwt_auth'
- class BaseConfig:
- """Base configuration."""
- SECRET_KEY = os.getenv('SECRET_KEY', 'yn=xc5xfanBxb8tnx83xbefx8axe3xddEx17x06xc9x96x8ec|')
- DEBUG = False
- CSRF_ENABLED = True
- BCRYPT_LOG_ROUNDS = 13
- SQLALCHEMY_TRACK_MODIFICATIONS = False
- class DevelopmentConfig(BaseConfig):
- """Development configuration."""
- DEVELOPMENT = True
- DEBUG = True
- BCRYPT_LOG_ROUNDS = 4
- SQLALCHEMY_DATABASE_URI = postgres_local_base + database_name
- class TestingConfig(BaseConfig):
- """Testing configuration."""
- # DEBUG = True
- TESTING = True
- BCRYPT_LOG_ROUNDS = 4
- SQLALCHEMY_DATABASE_URI = postgres_local_base
- PRESERVE_CONTEXT_ON_EXCEPTION = False
- class ProductionConfig(BaseConfig):
- """Production configuration."""
- SECRET_KEY = 'yn=xc5xfanBxb8tnx83xbefx8axe3xddEx17x06xc9x96x8ec|'
- DEBUG = False
- SQLALCHEMY_DATABASE_URI = 'postgresql//postgres:123123123!@confudb.cusmbtiketg6.ap-south-1.rds.amazonaws.com/confudb'
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement