Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """define methods to handle /api/v1/auth/ routes"""
- from flask import Blueprint, request, jsonify, make_response, current_app
- from API.app.v2.utils.exeptions import MalformedRequest
- from API.app.v2.models.users import Users
- from werkzeug.security import check_password_hash
- from flask_jwt_extended import jwt_required, create_access_token
- from API.app.v2.utils.validation import login_schema, signup_schema
- from cerberus import Validator
- from psycopg2 import ProgrammingError
- BP = Blueprint('auth', __name__, url_prefix='/api/v2/auth/')
- INVALID_JSON_MSG = 'Malformed Request data.Please use JSON data'
- @BP.errorhandler(MalformedRequest) # register MalfromedRequest handler
- def handle_malformed_request(error):
- """handle malformed request, missing json or malformed json"""
- response = jsonify(error.to_dict())
- response.status_code = error.status_code
- return response
- @BP.errorhandler(ProgrammingError) # register DataBaseError handler
- def handle_database_errors(error):
- """handle database errors"""
- response = jsonify({
- 'message': "Application error. Please try again later."
- })
- response.status_code = 500
- """Log the error"""
- current_app.logger.error(f'A database error occured. {error}')
- return response
- @BP.route('login', methods=['POST'])
- def login():
- """method to process POST /login request"""
- if request.method == 'POST':
- credentials = request.get_json(force=True, silent=True)
- if not credentials:
- raise MalformedRequest(INVALID_JSON_MSG) # return bad request
- v = Validator(login_schema, allow_unknown=True)
- r = v.validate(credentials)
- errors = v.errors
- errors['message'] = 'Please correct the indicated errors'
- if not r:
- return make_response(jsonify(errors), 400)
- username = credentials['username'].lower()
- password = credentials['password']
- users = Users()
- result = users.get_user(username)
- if not result:
- return make_response(jsonify({
- 'message': 'Invalid Username or Password'
- }), 401)
- return make_response(jsonify({
- 'message': 'Login Successfull',
- 'access_token': create_access_token(identity=username)
- }), 200) if check_password_hash(result['password'], password) else \
- make_response(jsonify({
- 'message': 'Invalid Username or Password'
- }), 401)
- @BP.route('signup', methods=['POST'])
- def signup():
- """method to process POST to /signup"""
- if request.method == 'POST':
- credentials = request.get_json(force=True, silent=True)
- if not credentials:
- raise MalformedRequest(INVALID_JSON_MSG) # return BADREQUEST
- validator = Validator(signup_schema, allow_unknown=True)
- valid = validator.validate(credentials)
- errors = validator.errors
- errors[' message'] = 'Please Fix the indicated errors'
- if not valid:
- return make_response(jsonify(errors), 400)
- password = credentials['password']
- conf_password = credentials['confirm_password']
- f_name = credentials['first_name']
- l_name = credentials['last_name']
- username = credentials['username'].lower()
- email = credentials['email_address'].lower()
- if password != conf_password:
- raise MalformedRequest('Passwords do not match', 400)
- users = Users()
- result = users.get_user(username) # check username
- if result:
- return make_response(jsonify({
- 'message': 'Username already exists'
- }), 409)
- result = users.get_email(email) # check email
- if result:
- return make_response(jsonify({
- 'message': 'Email address already registered'
- }), 409)
- result = users.add_user({
- 'username': username,
- 'password': password,
- 'email_address': email,
- 'first_name': f_name,
- 'last_name': l_name,
- })
- return make_response(jsonify({
- 'message': 'Signup Successful'
- }), 201)
Add Comment
Please, Sign In to add comment