Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import datetime
- import decimal
- import json
- import database_connection
- import flask
- from flask import request, jsonify, Response
- from flask_cors import CORS
- from sqlalchemy.exc import IntegrityError
- dao = database_connection.DataAccessObject()
- app = flask.Flask(__name__)
- cors = CORS(app, resources={r"/*": {"origins": "*"}})
- app.config["DEBUG"] = True
- def dict_factory(cursor, row):
- d = {}
- for idx, col in enumerate(cursor.description):
- d[col[0]] = row[idx]
- return d
- def alchemy_encoder(obj):
- """JSON encoder function for SQLAlchemy special classes."""
- if isinstance(obj, datetime.date):
- return obj.isoformat()
- elif isinstance(obj, decimal.Decimal):
- return float(obj)
- class InvalidUsage(Exception):
- status_code = 400
- def __init__(self, message, status_code=None, payload=None):
- Exception.__init__(self)
- self.message = message
- if status_code is not None:
- self.status_code = status_code
- self.payload = payload
- def to_dict(self):
- rv = dict(self.payload or ())
- rv['message'] = self.message
- return rv
- @app.errorhandler(InvalidUsage)
- def handle_invalid_usage(error):
- response = jsonify(error.to_dict())
- response.status_code = error.status_code
- return response
- @app.route('/', methods=['GET'])
- def home():
- return '''<h1>RecipeX API</h1>
- <p>Merry Christmas everybody!</p>'''
- @app.route('/api/v1/all', methods=['GET'])
- def api_users():
- all_users = json.dumps(dao.json_query('SELECT * FROM users;'), default=alchemy_encoder)
- return all_users
- # Pobranie informacji o sesji
- @app.route('/api/v1/session-info', methods=['GET'])
- def api_session_info():
- session_key = request.args.get('session_key')
- result = dao.json_query("select * from sessions where session_key = {}".format(session_key))
- if len(result) == 0:
- return Response("", status=404, mimetype="application/json")
- return Response(json.dumps(result[0], default=alchemy_encoder), mimetype="application/json")
- @app.route('/api/v1/user/add', methods=['POST'])
- def api_register():
- params = request.get_json()
- login = params['login']
- password = params['password']
- if 'email' in params:
- email = params['email']
- if len(email) > 0:
- query = "select * from users where email = '{email}'".format(email=email)
- result = dao.json_query(query)
- if len(result) > 0:
- return Response(json.dumps({'email': True}), status=409, mimetype='application/json')
- else:
- email = ''
- if 'role' in params:
- role = params['role']
- else:
- role = 'user'
- query = "select * from users where login = '{login}'".format(login=login)
- result = dao.json_query(query)
- if len(result) > 0:
- return Response(json.dumps({'login': True}), status=409, mimetype='application/json')
- query = """insert into users(login,email,password,role)
- values('{login}','{email}','{password}','{role}')
- """.format(login=login, email=email, password=password, role=role)
- dao.run_query(query)
- return api_login()
- @app.route('/api/v1/login', methods=['POST', 'GET'])
- def api_login():
- params = request.get_json()
- login = params['login']
- password = params['password']
- users = dao.json_query(
- "SELECT * FROM USERS where login = '{}' and password = '{}'".format(login, password)
- )
- if len(users) > 0:
- user = users[0]
- user.pop('password', None)
- query = """insert into sessions(user_id, expires_at)
- values({id}, now() + interval '7 days') returning session_key, expires_at
- """.format(id=user['id'])
- session = dao.json_query(query)[0]
- return Response(json.dumps(session, default=alchemy_encoder), mimetype='application/json')
- else:
- print('empty')
- raise InvalidUsage('Zły login i/ lub hasło', status_code=403)
- # Wylogowanie - usunięcie sesji
- @app.route('/api/v1/logout', methods=['POST', 'GET'])
- def api_logout():
- headers = request.headers
- if 'Authorization' in headers:
- session_key = headers['Authorization']
- dao.json_query("DELETE FROM sessions WHERE session_key = {}".format(session_key))
- return Response("", status=204)
- else:
- raise InvalidUsage("", status_code=404)
- # wyciągnięcie informacji o użytkowniku sesji
- @app.route('/api/v1/user-info', methods=['GET'])
- def current_user_info():
- user_id = get_current_session_user()[0]
- if user_id:
- user = dao.json_query("select login, email from users where id = {user_id};".format(user_id=user_id))
- if user:
- user = user[0]
- rec_rec = dao.json_query(
- "select recipe_id, rank_position from recommendations where user_id = {}".format(user_id)
- )
- new_rec = []
- for row in rec_rec:
- recipe = Recipe(row['recipe_id'])
- rec_data = json.loads(recipe.get_short_data())
- row.pop('recipe_id')
- row = {**row, **rec_data}
- new_rec.append(row)
- rec_rec = new_rec
- user['recommended_recipes'] = rec_rec
- pref = dao.json_query(
- """select b.* from preferences a join products b on a.product_id = b.id
- where a.user_id = {} and rating;""".format(user_id)
- )
- user['preferred_products'] = [row for row in pref]
- not_pref = dao.json_query(
- """select b.* from preferences a join products b on a.product_id = b.id
- where a.user_id = {} and not rating;""".format(user_id)
- )
- user['not_preferred_products'] = [row for row in not_pref]
- return Response(json.dumps(user, default=alchemy_encoder), mimetype='application/json')
- else:
- raise InvalidUsage('Brak użytkownika', status_code=404)
- # Zmiana hasla
- @app.route('/api/v1/password_change/', methods=['PUT', 'GET'])
- def api_pass_change():
- user_id, msg = get_current_session_user()
- if user_id is None or msg != "session_running":
- # niezalogowany użytkownik
- raise InvalidUsage("", status_code=401)
- current_password = request.form['currentPassword']
- new_password = request.form['newPassword']
- try:
- update = dao.json_query(
- "update users set password = '{}' where id = {} and password = '{}' returning id".format(
- new_password, user_id, current_password
- )
- )
- if update:
- return '', 204
- else:
- raise InvalidUsage('Nieprawidlowe haslo', 401)
- except IntegrityError:
- raise InvalidUsage('Nowe haslo jest nieprawidlowe', 405)
- @app.route('/api/v1/category/add', methods=["POST"])
- def api_add_category():
- user_id, msg = get_current_session_user()
- if not user_id or msg != "session_running":
- # niezalogowany użytkownik
- raise InvalidUsage("", status_code=401)
- user_role_result = dao.json_query("select role from users where id = {}".format(user_id))
- if len(user_role_result) == 0 or user_role_result[0]['role'] != 'admin':
- # brak użytkownika lub użytkownik nie jest adminem
- raise InvalidUsage("", status_code=403)
- name = request.form['name']
- _id = request.form['id']
- cats_with_id = dao.json_query("select name from categories where id = '{}'".format(_id))
- if len(cats_with_id) > 0:
- # jest już kategoria z takim id
- return Response(json.dumps({'id': True}), status=409)
- result = dao.json_query("INSERT INTO categories(id, name) VALUES('{}', '{}') returning *".format(_id, name))
- return Response(json.dumps(result[0]), mimetype='application/json')
- @app.route('/api/v1/product/all', methods=["GET"])
- def api_get_products():
- result = dao.json_query("select * from products")
- return Response(json.dumps(result), mimetype='application/json')
- @app.route('/api/v1/product/add', methods=["POST"])
- def api_add_product():
- user_id, msg = get_current_session_user()
- if not user_id or msg != "session_running":
- # niezalogowany użytkownik
- raise InvalidUsage("", status_code=401)
- user_role_result = dao.json_query("select role from users where id = {}".format(user_id))
- if len(user_role_result) == 0 or user_role_result[0]['role'] not in ['moderator', 'admin']:
- # brak użytkownika lub użytkownik nie jest adminem ani moderatorem
- raise InvalidUsage("", status_code=403)
- name = request.form['name']
- measurement = request.form['measurement']
- result = dao.json_query(
- "INSERT INTO products(id, name, measurement) VALUES(default, '{}', '{}') returning *".format(name, measurement))
- return Response(json.dumps(result[0]), mimetype='application/json')
- @app.route('/api/v1/product/edit/', methods=["POST"])
- def api_edit_product():
- user_id, msg = get_current_session_user()
- if not user_id or msg != "session_running":
- # niezalogowany użytkownik
- raise InvalidUsage("", status_code=401)
- user_role_result = dao.json_query("select role from users where id = {}".format(user_id))
- if len(user_role_result) == 0 or user_role_result[0]['role'] not in ['moderator', 'admin']:
- # brak użytkownika lub użytkownik nie jest adminem ani moderatorem
- raise InvalidUsage("", status_code=403)
- product_id = request.args.get('id')
- product_result = dao.json_query("select * from products where id = {}".format(product_id))
- if len(product_result) == 0:
- # nie ma takiego produktu
- raise InvalidUsage(json.dumps({'id': True}), status_code=404)
- name = product_result[0]['name']
- measurement = product_result[0]['measurement']
- if 'name' in request.form:
- name = request.form['name']
- if 'measurement' in request.form:
- measurement = request.form['measurement']
- result = dao.json_query(
- "update products set name = '{}', measurement = '{}' where id = {} returning *".format(name, measurement,
- product_id))
- return Response(json.dumps(result[0]), mimetype='application/json')
- @app.route('/api/v1/recipe/', methods=['GET'])
- def recipe_data():
- recipe_id = request.args.get('id')
- if recipe_id:
- recipe_instance = Recipe(recipe_id)
- return Response(recipe_instance.get_data(), mimetype='application/json')
- else:
- raise InvalidUsage('Nie znaleziono przepisu', 404)
- class Recipe:
- def __init__(self, recipe_id):
- self.id = recipe_id
- def get_short_data(self):
- recipes = dao.json_query("""select id,title,description,image,duration,portions,difficulty_level
- from recipes where id = {}""".format(self.id))
- if len(recipes) > 0:
- recipe = recipes[0]
- recipe['rating'] = self.get_recipe_rating()
- return json.dumps(recipe, default=alchemy_encoder)
- else:
- raise InvalidUsage('Nie znaleziono przepisu', 404)
- def get_short_data_params(self, sort_by, items):
- if sort_by == "rating-down" or sort_by == "rating-up":
- basic_query = """select a.*, b.rating from
- (select id,title,description,image,duration,portions,difficulty_level
- from recipes where id in ({})) a
- join
- (select sum(rating)/count(*) as rating, recipe_id
- from rated where recipe_id in ({}) group by recipe_id) b
- on a.id = b.recipe_id order by rating
- """.format(self.id, self.id)
- if sort_by == "rating-down":
- basic_query = basic_query + " desc"
- else:
- order_query = ""
- if sort_by == 'duration-down':
- order_query = ' order by duration desc'
- elif sort_by == 'duration-up':
- order_query = ' order by duration'
- elif sort_by == 'difficulty_level_down':
- order_query = ' order by difficulty_level desc'
- elif sort_by == 'difficulty_level_up':
- order_query = ' order by difficulty_level'
- elif sort_by == 'created_at_down':
- order_query = ' order by created_at desc'
- elif sort_by == 'created_at_up':
- order_query = ' order by created_at'
- basic_query = """select id,title,description,image,duration,portions,difficulty_level
- from recipes where id in ({})""".format(
- self.id) + order_query
- if items:
- query = basic_query + " limit {items}".format(items=items)
- else:
- query = basic_query
- recipes = dao.json_query(query)
- if len(recipes) == 1:
- recipe = recipes[0]
- recipe['rating'] = self.get_recipe_rating()
- return json.dumps(recipe, default=alchemy_encoder)
- elif len(recipes) > 1:
- new_recipes = []
- for recipe in recipes:
- recipe['rating'] = self.get_recipe_rating()
- new_recipes.append(recipe)
- recipes = new_recipes
- return json.dumps(recipes, default=alchemy_encoder)
- else:
- raise InvalidUsage('Nie znaleziono przepisu', 404)
- def get_data(self):
- recipes = dao.json_query("select * from recipes where id = {}".format(self.id))
- if len(recipes) > 0:
- recipe = recipes[0]
- recipe.update({'creator': self.get_recipe_creator(recipe), 'categories': self.get_recipe_categories(),
- 'devices': self.get_recipe_devices(), 'ingredients': self.get_recipe_ingredients(),
- 'rating': self.get_recipe_rating(), 'comments': self.get_recipe_comments()})
- return json.dumps(recipe, default=alchemy_encoder)
- else:
- raise InvalidUsage('Nie znaleziono przepisu', 404)
- # @staticmethod
- def get_recipe_creator(self, recipe):
- creator_id = recipe.pop('creator_id')
- creator = dao.json_query("select id,login from users where id = {}".format(creator_id))
- if len(creator) > 0:
- creator = creator[0]
- return creator
- def get_recipe_categories(self):
- categories = dao.json_query(
- """with cte as (select category_id from recipe_category where recipe_id in ({}))
- select * from categories where id in (select category_id from cte)""".format(
- self.id))
- return categories
- def get_recipe_devices(self):
- devices = dao.json_query(
- """with cte as (select device_id from devices_to_use where recipe_id in ({}))
- select * from devices where id in (select device_id from cte)""".format(
- self.id))
- return devices
- def get_recipe_ingredients(self):
- ingredients = dao.json_query(
- "select * from products a join ingredients b on a.id = b.product_id where b.recipe_id in ({})".format(
- self.id))
- return ingredients
- # def get_recipe_main_rating(self):
- # ratings = dao.json_query(
- # """select (sum(rating)/count(*) as overall
- # from rated where recipe_id in ({})""".format(
- # self.id))
- # if len(ratings) > 0:
- # rating = ratings[0]
- # return rating
- def get_recipe_rating(self):
- ratings = dao.json_query(
- """select count(*) as quantity, round(sum(rating)/count(*),2) as overall
- from rated where recipe_id in ({})""".format(
- self.id))
- if len(ratings) > 0:
- rating = ratings[0]
- user_id = get_current_session_user()[0]
- if user_id:
- user_ratings = dao.json_query(
- "select rating from rated where recipe_id in ({}) and user_id = {}".format(self.id, user_id))
- if len(user_ratings) > 0:
- user_rating = user_ratings[0]
- rating['user_rating'] = user_rating['rating']
- return rating
- def get_recipe_comments(self):
- comments = dao.json_query(
- "select id,date,content,author_id from comments where parent_comment_id is null and recipe_id in ({})".format(
- self.id))
- for row in comments:
- author_id = row.pop('author_id')
- author = dao.json_query("select id,login from users where id = {}".format(author_id))
- row['creator'] = author
- replies = dao.json_query(
- "select id,date,content,author_id from comments where parent_comment_id = {}".format(row['id']))
- for reply in replies:
- author_id = reply.pop('author_id')
- author = dao.json_query("select id,login from users where id = {}".format(author_id))
- reply['creator'] = author
- row['replies'] = replies
- return comments
- def get_current_session_user():
- if 'Authorization' in request.headers:
- session_key = request.headers['Authorization']
- session = dao.json_query(
- """select case when expires_at > now() then user_id else null end
- from sessions where session_key = {}""".format(
- session_key))
- if len(session) > 0 and 'case' in session[0] and session[0]['case'] is not None:
- user_id = session[0]['case']
- return user_id, 'session_running'
- else:
- return None, 'session_expired'
- else:
- return None, 'user_not_registered'
- def table_to_string(table):
- table = str(table).replace("[", "")
- table = str(table).replace("]", "")
- return table
- @app.route('/api/v1/recipes-list', methods=["GET"])
- def list_of_recipes():
- params = request.get_json()
- queries = []
- if 'query' in params:
- browser_input = params['query']
- if browser_input and browser_input != '':
- query = "select distinct id from recipes where lower(title) like '%%{browser_input}%%'".format(
- browser_input=browser_input.lower())
- queries.append(query)
- if 'categories' in params:
- categories = params['categories']
- if categories and categories != '':
- query = """select distinct a.id from recipes a join recipe_category b on a.id = b.recipe_id
- where b.category_id in ({})""".format(table_to_string(categories))
- queries.append(query)
- if 'products_to_include' in params:
- products_to_include = params['products_to_include']
- query = """select id from recipes a join ingredients b on a.id = b.recipe_id
- where b.product_id in ({}) group by a.id having count(*) = {}""".format(
- table_to_string(products_to_include), len(products_to_include))
- res = dao.list_query(query)
- if len(res) < 3:
- query = """select id from recipes a join ingredients b on a.id = b.recipe_id
- where b.product_id in ({}) group by a.id order by count(*) limit 10""".format(
- table_to_string(products_to_include))
- queries.append(query)
- if 'products_to_exclude' in params:
- products_to_exclude = params['products_to_exclude']
- if products_to_exclude and products_to_exclude != '':
- query = """select distinct id from recipes where id not in
- (select a.id from recipes a join ingredients b on a.id = b.recipe_id
- where b.product_id in ({}))""".format(table_to_string(products_to_exclude))
- queries.append(query)
- if 'min_duration' in params:
- min_duration = params['min_duration']
- query = "select distinct id from recipes where duration >= {}".format(min_duration)
- queries.append(query)
- if 'max_duration' in params:
- max_duration = params['max_duration']
- query = "select distinct id from recipes where duration <= {}".format(max_duration)
- queries.append(query)
- if 'min_difficulty_level' in params:
- min_difficulty_level = params['min_difficulty_level']
- query = "select distinct id from recipes where difficulty_level >= {}".format(min_difficulty_level)
- queries.append(query)
- if 'max_difficulty_level' in params:
- max_difficulty_level = params['max_difficulty_level']
- query = "select distinct id from recipes where difficulty_level <= {}".format(max_difficulty_level)
- queries.append(query)
- if len(queries) > 0:
- main_query = "select c.* from (({}) a join (".format(queries[0])
- old_letter = 'a'
- letter = 'b'
- for item in queries[1:]:
- main_query = main_query + """{item}) {letter}
- on {old_letter}.id = {letter}.id join (""".format(item=item, letter=letter,
- old_letter=old_letter)
- letter = chr(ord(letter) + 1)
- if len(queries) == 1:
- main_query = main_query[:-8] + ") c"
- else:
- main_query = main_query[:-7] + ") c"
- else:
- main_query = "select id from recipes"
- res = dao.list_query(main_query)
- if len(res) > 0:
- if 'min_rating' in params or 'max_rating' in params:
- if 'min_rating' in params:
- min_rating = params['min_rating']
- else:
- min_rating = 0
- if 'max_rating' in params:
- max_rating = params['max_rating']
- else:
- max_rating = 5
- new_res = []
- for row in res:
- rating = dao.list_query(
- """select sum(rating)/count(*)
- from rated where recipe_id in ({})""".format(
- table_to_string(res)))
- if len(rating) > 0:
- if min_rating <= rating[0] <= max_rating:
- new_res.append(row)
- res = new_res
- if len(res) > 0:
- res = ','.join(str(v) for v in res)
- recipes = Recipe(res)
- if 'sort_by' in params:
- sort_by = params['sort_by']
- else:
- sort_by = None
- if 'items' in params:
- items = params['items']
- else:
- items = None
- recipes = recipes.get_short_data_params(sort_by, items)
- if 'products_to_include' in params:
- recipes = json.loads(recipes)
- products_to_include = params['products_to_include']
- new_recipes = []
- for recipe in recipes:
- query = """select count(*) from recipes a join ingredients b on a.id = b.recipe_id
- where b.product_id in ({}) and a.id = {}""".format(table_to_string(products_to_include),
- recipe['id'])
- products_number = dao.list_query(query)
- if len(products_number) > 0:
- recipe['number_of_user_products'] = products_number[0]
- new_recipes.append(recipe)
- recipes = new_recipes
- else:
- recipes = []
- else:
- recipes = []
- return Response(json.dumps(recipes), mimetype='application/json')
- app.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement