Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import datetime
- from functools import wraps
- from flask import Flask, jsonify, request, make_response
- from flask_sqlalchemy import SQLAlchemy
- from flask_marshmallow import Marshmallow
- from werkzeug.security import (
- generate_password_hash,
- check_password_hash
- )
- from flask_jwt_extended import (
- JWTManager,
- jwt_required,
- create_access_token,
- get_jwt_identity
- )
- from flask_jwt_extended.utils import decode_token
- app = Flask(__name__)
- app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
- app.config['SECRET_KEY'] = 'super-secret-key'
- app.config['JWT_SECRET_KEY'] = 'super-secret-key'
- db = SQLAlchemy(app)
- ma = Marshmallow(app)
- jwt = JWTManager(app)
- class User(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- username = db.Column(db.String, nullable=False)
- password_hash = db.Column(db.String, nullable=False)
- admin = db.Column(db.Boolean, default=False)
- def __init__(self, username, password):
- self.username = username
- self.set_password(password)
- def set_password(self, password):
- self.password_hash = generate_password_hash(password)
- def check_password(self, password):
- return check_password_hash(self.password_hash, password)
- class UserSchema(ma.Schema):
- class Meta:
- fields = ['username', 'admin', '_links']
- _links = ma.Hyperlinks({
- 'self': ma.URLFor('get_user', id='<id>'),
- 'collection': ma.URLFor('get_users')
- })
- user_schema = UserSchema()
- users_schema = UserSchema(many=True)
- def current_user(func):
- @wraps(func)
- def wrapped(*args, **kwargs):
- username = get_jwt_identity()
- current_user = User.query.filter(User.username==username).one()
- return func(current_user, *args, **kwargs)
- return wrapped
- def admin_only(func):
- @wraps(func)
- def wrapped(current_user, *args, **kwargs):
- if not current_user.admin:
- return jsonify(msg='Must be an admin'), 401
- return func(current_user, *args, **kwargs)
- return wrapped
- @app.route('/', methods=['GET'])
- def index():
- return jsonify('Basic JWT app')
- @app.route('/users', methods=['GET'])
- def get_users():
- users = User.query.all()
- return users_schema.jsonify(users)
- @app.route('/users/<int:id>', methods=['GET'])
- def get_user(id):
- user = User.query.get(id)
- return user_schema.jsonify(user)
- @app.route('/users', methods=['POST'])
- def create_user():
- data = request.get_json()
- user = User(
- username = data['username'],
- password = data['password'])
- db.session.add(user)
- db.session.commit()
- return user_schema.jsonify(user)
- @app.route('/auth', methods=['POST'])
- def get_token():
- data = request.json
- username = data.get('username')
- password = data.get('password')
- if not username and password:
- return jsonify({'error': 'Missing username or password'}), 400
- user = User.query.filter(User.username==username).one()
- if not (user and user.check_password(password)):
- return jsonify({'error': 'Incorrect username or password'}), 401
- access_token = create_access_token(identity=username)
- return jsonify(access_token=access_token), 200
- @app.route('/auth/exp', methods=['GET'])
- @jwt_required
- def get_exp():
- auth = request.headers['Authorization']
- token = auth.split(' ')[-1]
- decoded_token = decode_token(token)
- exp = decoded_token.get('exp')
- expires_at = datetime.datetime.fromtimestamp(exp).strftime('%c')
- return jsonify(expires_at=expires_at)
- @app.route('/protected', methods=['GET'])
- @jwt_required
- @current_user
- def protected(current_user):
- return jsonify(logged_in_as=current_user.username), 200
- @app.route('/admin', methods=['GET'])
- @jwt_required
- @current_user
- @admin_only
- def admin(current_user):
- return '', 204
- if __name__ == '__main__':
- app.run(debug=True)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement