Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # coding=utf-8
- import hashlib
- from functools import partial, wraps
- try:
- import ujson as json
- except Exception:
- import json
- import libuuid
- import six
- from abc import ABCMeta, abstractmethod
- from flask import request, url_for
- from flask_login import login_user
- from flask_restful import Resource
- from flask_restful.reqparse import RequestParser
- from sqlalchemy.exc import SQLAlchemyError
- from werkzeug.exceptions import BadRequest
- from hasoffers.core.model import Session, User, LoginHistory
- from hasoffers.core.util.config import ConfigReader
- from hasoffers.core.util.helpers.redis_actions import (
- ACCOUNT_CHANGES,
- RedisSession,
- )
- from hasoffers.core.util.ip import get_current_ip_from_request
- from hasoffers.core.util.mailsender import MailSender
- from hasoffers.core.util.redis_session import RedisSessionInterface
- from hasoffers.core.util.structlogger import get_logger
- from hasoffers.core.util.traffichelper import TrafficHelper
- from hasoffers.partner_plus.controllers.auth import Auth
- logger = get_logger()
- cfg = ConfigReader.get_config('hasoffers.partner_plus', keep_letters_case=True)
- def check_if_result_exists(f=None, exc_message=None):
- """
- Проверка, что функция возвращает не None,
- иначе рейзится BadRequest
- :param f: оборачиваемая функция
- :param exc_message: сообщение, отправляемое на клиент при ошибке
- """
- if f is None:
- return partial(check_if_result_exists, exc_message)
- exc_message = exc_message or 'Some errors occurred'
- @wraps
- def wrapper(*args, **kwargs):
- res = f(*args, **kwargs)
- if not res:
- raise BadRequest(exc_message)
- return res
- return wrapper
- class LoginMixin(object):
- @staticmethod
- def make_login_history(user_id):
- lh = LoginHistory()
- traffic_helper = TrafficHelper()
- lh.user_id = user_id
- lh.ip = get_current_ip_from_request()
- lh.domain = request.host
- lh.country = traffic_helper.country.name or ''
- lh.user_agent = traffic_helper.bsh.user_agent or ''
- lh.device = traffic_helper.bsh._uap.device.family or ''
- lh.os = traffic_helper.bsh.family or ''
- lh.browser = traffic_helper.bsh._uap.browser.family or ''
- return lh
- @staticmethod
- def login_user_or_raise_bad_request(user):
- try:
- login_user(user, False, force=True)
- except Exception:
- logger.exception(
- 'Error while user logging in (resetting password)')
- raise BadRequest('Error: user cannot be logged')
- else:
- # Разлогинить пользователя со всех клиентов
- RedisSessionInterface.delete_user_sessions(user.id)
- return
- @six.with_metaclass(ABCMeta)
- class CredentialsResetResource(Resource):
- REDIS = RedisSession(db_id=ACCOUNT_CHANGES)
- PWD_RECOVERY_TTL = cfg.getint('flask', 'password_recovery_ttl')
- TWOFA_RECOVERY_TTL = cfg.getint('flask', 'twofa_recovery_ttl')
- CREDENTIAL_TYPE = None
- get_parser = RequestParser()
- get_parser.add_argument('key', type=six.text_type, required=True)
- @abstractmethod
- def get(self):
- pass
- @abstractmethod
- def post(self):
- pass
- @staticmethod
- def _get_hash_from_text(text):
- return hashlib.md5(text.encode('utf8')).hexdigest()
- @staticmethod
- @check_if_result_exists('Error. There is no user with such email address')
- def _get_user_from_email_or_raise_exception(email):
- return Session.query(User).filter(User.email == email).first()
- @check_if_result_exists('Error: the link is obsolete')
- def _get_data_from_redis_or_raise_exception(self, key):
- return self.REDIS.get(key)
- @staticmethod
- def _make_response(user):
- res = Auth.user_to_dict(user)
- res['message'] = 'Password changed.'
- res['redirect_to'] = url_for(
- endpoint='PartnerIndex.index',
- _external=True,
- _scheme='',
- )
- res['redirect'] = True
- return res
- @staticmethod
- def _get_mail_ctx_from_key_and_user(key, user):
- return {
- 'token': key,
- 'username': user.name,
- 'email': user.email,
- }
- def _generate_token_from_params(self, *params):
- return self._get_hash_from_text(''.join(params))
- class ResetPasswordResource(LoginMixin, CredentialsResetResource):
- CREDENTIAL_TYPE = 'reset_password'
- SET_PASSWORD = 'set_password'
- RESET_PASSWORD = 'reset_password'
- ACTION_CHOICES = (SET_PASSWORD, RESET_PASSWORD,)
- SUCCESS_RESET_MSG = 'An email notification has been sent with ' \
- 'additional information. Please check your mailbox.'
- post_parser = RequestParser()
- post_parser.add_argument('email', type=six.text_type, required=True)
- post_parser.add_argument('action', type=six.text_type, choices=ACTION_CHOICES, required=True)
- def get(self):
- args = self.get_parser.parse_args()
- data = self._get_data_from_redis_or_raise_exception(args.key)
- user = User.Get(data['user_id'])
- self.login_user_or_raise_bad_request(user)
- if not user.exist_flag(User.Flags.entered):
- user.set_flag(User.Flags.entered)
- res = self._make_response(user)
- lh = self.make_login_history(user.id)
- Session.add(lh)
- try:
- Session.commit()
- except SQLAlchemyError:
- Session.rollback()
- logger.exception('Error while saving login history')
- return res
- def post(self):
- context = self.post_parser.parse_args()
- user = self._get_user_from_email_or_raise_exception(context.email)
- hashed_password = self._generate_token_from_params(context.password)
- context.update({
- 'user': user,
- 'hashed_password': hashed_password,
- })
- return self.execute_action(context)
- def execute_action(self, context):
- return getattr(self, context['action'])(context)
- @staticmethod
- def set_password(context):
- user = context['user']
- user.password = context['hashed_password']
- try:
- Session.commit()
- except SQLAlchemyError:
- Session.rollback()
- logger.exception(
- 'Error while resetting password!\n '
- 'User: [{0}] {1}'.format(user.id, user.name)
- )
- raise BadRequest('Error while resetting password')
- else:
- return {'message': 'Success. Password was changed.'}
- def reset_password(self, context):
- user = context['user']
- password = context['password']
- key = self._generate_token_from_params(user.id, password)
- self.REDIS.setex(
- name=key,
- time=self.PWD_RECOVERY_TTL,
- value=json.dumps({
- 'user_id': user.id,
- 'password': context['hashed_password'],
- })
- )
- mail_template_ctx = self._get_mail_ctx_from_key_and_user(key, user)
- MailSender.send_service_message(
- reason=self.CREDENTIAL_TYPE,
- context=mail_template_ctx,
- emails=[(user.email, user.name)],
- important=True,
- )
- return {
- 'message': self.SUCCESS_RESET_MSG
- }
- class ResetTwoFAResource(CredentialsResetResource):
- CREDENTIAL_TYPE = 'reset_two_fa'
- post_parser = RequestParser()
- post_parser.add_argument('email', type=six.text_type, required=True)
- def get(self):
- args = self.get_parser.parse_args()
- data = self._get_data_from_redis_or_raise_exception(args.key)
- user = User.Get(data['user_id'])
- user.otp_secret = ''
- try:
- Session.commit()
- except SQLAlchemyError:
- Session.rollback()
- logger.exception('Error while saving login history')
- return {'message': 'OTP disabled'}
- def post(self):
- args = self.post_parser.parse_args()
- user = self._get_user_from_email_or_raise_exception(args.email)
- key = libuuid.uuid4().hex
- self.REDIS.setex(
- name='otp_{}'.format(key),
- time=self.TWOFA_RECOVERY_TTL,
- value=user.id
- )
- mail_template_ctx = self._get_mail_ctx_from_key_and_user(key, user)
- MailSender.send_service_message(
- reason='otp_disable',
- context=mail_template_ctx,
- emails=[(user.email, user.name)],
- important=True,
- )
- return {'message': self.SUCCESS_RESET_MSG}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement