Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pickle
- import phonenumbers
- from phonenumbers.phonenumberutil import PhoneNumberFormat
- from sqlalchemy import func
- from sqlalchemy import func, text
- from sqlalchemy.sql.expression import select
- import pinn
- from pinn.name import Name
- from pinn.roles.role import Role, RoleType
- import pinn.storage.postgresql.schema
- from pinn.storage.postgresql.schema import roles, user_roles, users
- from pinn.storage.user import UserStore, NonExistingUserError
- from pinn.users.password import PasswordHash
- from pinn.users.user import User
- from pinn.users.user import UserValidationError
- from pinn.util import generate_into
- from pinn.validation import ValidationError
- from zope.interface import implementer
- from datetime import datetime
- from dateutil import relativedelta
- @implementer(UserStore)
- class PostgreSQLUserStore:
- def __init__(self, db):
- self._db = db
- @generate_into(list)
- def fetch_all_users(self):
- rows = self._db.execute(select([users])).fetchall()
- for row in rows:
- yield self._row_to_user(row)
- @generate_into(list)
- def fetch_all_users_with_role(self, role_name):
- query = select([users, user_roles, roles], use_labels=True) \
- .where(users.c.id == user_roles.c.user_id) \
- .where(roles.c.id == user_roles.c.role_id) \
- .where(roles.c.name == role_name)
- rows = self._db.execute(query).fetchall()
- for row in rows:
- yield self._row_to_user(row)
- @generate_into(list)
- def fetch_all_loggedin_users(self):
- current_datetime = datetime.now()
- query = select([users]).where(users.c.last_heartbeat - current_datetime < relativedelta(minutes=10))
- rows = self._db.execute(query).fetchall()
- for row in rows:
- yield self._row_to_user(row)
- def update_user_last_heartbeat(self, user):
- if self.fetch_user_by_id(user.id) is None:
- raise NonExistingUserError()
- last_heartbeat = self.user.last_heartbeat
- query = users.update().values(last_heartbeat=last_heartbeat).where(users.c.id == user.id)
- self._db.execute(query)
- def fetch_user_by_id(self, id):
- query = select([users]).where(users.c.id == id)
- row = self._db.execute(query).first()
- return row and self._row_to_user(row)
- def fetch_wps_key_by_user_id(self, id):
- query = select([users.c.wps_key]).where(users.c.id == id)
- return self._db.execute(query).scalar()
- def fetch_user_by_email_address(self, email_address):
- query = select([users]).where(func.lower(email_address) == func.lower(users.c.email_address))
- row = self._db.execute(query).first()
- return row and self._row_to_user(row)
- def insert_user(self, user):
- first_name, tussenvoegsel, last_name = self._separate_name(user.name)
- formatted_phone_numbers = self._format_phone_numbers(user.phone_numbers)
- password_recovery_hash, password_recovery_rounds, password_recovery_salt = \
- self._get_password_recovery_hash(user)
- query = users.insert().values(
- first_name=first_name,
- tussenvoegsel=tussenvoegsel,
- last_name=last_name,
- password_hash=user.password_hash.hash,
- password_salt=user.password_hash.salt,
- password_rounds=user.password_hash.rounds,
- email_address=user.email_address,
- phone_numbers=formatted_phone_numbers,
- password_recovery_hash=password_recovery_hash,
- password_recovery_salt=password_recovery_salt,
- password_recovery_rounds=password_recovery_rounds,
- must_change_password=user.must_change_password,
- created_at=user.created_at,
- active=user.active,
- )
- return self._db.execute(query).inserted_primary_key[0]
- def update_user(self, user):
- if self.fetch_user_by_id(user.id) is None:
- raise NonExistingUserError()
- first_name, tussenvoegsel, last_name = self._separate_name(user.name)
- formatted_phone_numbers = self._format_phone_numbers(user.phone_numbers)
- password_recovery_hash, password_recovery_rounds, password_recovery_salt = \
- self._get_password_recovery_hash(user)
- query = users.update().values(
- first_name=first_name,
- tussenvoegsel=tussenvoegsel,
- last_name=last_name,
- password_hash=user.password_hash.hash,
- password_salt=user.password_hash.salt,
- password_rounds=user.password_hash.rounds,
- email_address=user.email_address,
- phone_numbers=formatted_phone_numbers,
- password_recovery_hash=password_recovery_hash,
- password_recovery_salt=password_recovery_salt,
- password_recovery_rounds=password_recovery_rounds,
- must_change_password=user.must_change_password,
- active=user.active,
- ).where(users.c.id == user.id)
- self._db.execute(query)
- def delete_user_by_id(self, id):
- query = users.delete().where(users.c.id == id)
- self._db.execute(query)
- def fetch_user_id_by_api_key(self, api_key):
- query = text("""
- SELECT id
- FROM users
- WHERE api_key = :api_key
- """)
- return self._db.execute(query, api_key=api_key).scalar()
- def fetch_api_key_by_user_id(self, user_id):
- query = text("""
- SELECT api_key
- FROM users
- WHERE id = :id
- """)
- return self._db.execute(query, id=user_id).scalar()
- def _separate_name(self, name):
- if name.last_name is None:
- return (name.first_name, None, None)
- else:
- return (name.first_name, name.last_name[0], name.last_name[1])
- def _get_password_recovery_hash(self, user):
- if user.password_recovery_hash is None:
- password_recovery_hash = None
- password_recovery_salt = None
- password_recovery_rounds = None
- else:
- password_recovery_hash = user.password_recovery_hash.hash
- password_recovery_salt = user.password_recovery_hash.salt
- password_recovery_rounds = user.password_recovery_hash.rounds
- return password_recovery_hash, password_recovery_rounds, password_recovery_salt
- def _format_phone_numbers(self, phone_numbers):
- return [phonenumbers.format_number(p, PhoneNumberFormat.INTERNATIONAL)
- for p in phone_numbers]
- def _row_to_user(self, row):
- user = User(
- id=row[users.c.id],
- name=row_to_name(row),
- password_hash=self._row_to_password_hash(row),
- email_address=row[users.c.email_address],
- phone_numbers=[phonenumbers.parse(p) for p in row[users.c.phone_numbers]],
- password_recovery_hash=self._row_to_password_recovery_hash(row),
- must_change_password=row[users.c.must_change_password],
- created_at=row[users.c.created_at],
- active=row[users.c.active],
- last_heartbeat=row[users.c.last_heartbeat],
- )
- return user
- def _row_to_password_hash(self, row):
- return PasswordHash(
- row[users.c.password_hash],
- row[users.c.password_salt],
- row[users.c.password_rounds],
- )
- def _row_to_password_recovery_hash(self, row):
- if not row[users.c.password_recovery_hash] is None:
- return PasswordHash(
- row[users.c.password_recovery_hash],
- row[users.c.password_recovery_salt],
- row[users.c.password_recovery_rounds],
- )
- @generate_into(list)
- def fetch_all_account_managers(self):
- rows = self._db.execute(select([users
- .join(user_roles, users.c.id == user_roles.c.user_id)
- .join(roles, roles.c.id == user_roles.c.role_id)],
- use_labels=True)
- .where(roles.c.name == RoleType.ACCOUNT_MANAGER.value)).fetchall()
- for row in rows:
- yield self._row_to_user(row)
- def _check_duplicate_email_address(db, user):
- query = (select([func.count(users.c.id) == 1])
- .where(users.c.email_address == user.email_address)
- .where(users.c.id != user.id))
- if db.execute(query).scalar():
- raise ValidationError(
- {UserValidationError.EMAIL_ADDRESS_ALREADY_IN_USE})
- def _row_to_password_hash(row):
- return PasswordHash(
- row[users.c.password_hash],
- row[users.c.password_salt],
- row[users.c.password_rounds],
- )
- def _row_to_password_recovery_hash(row):
- if not row[users.c.password_recovery_hash] is None:
- return PasswordHash(
- row[users.c.password_recovery_hash],
- row[users.c.password_recovery_salt],
- row[users.c.password_recovery_rounds],
- )
- def _row_to_user(row):
- user = User(
- row[users.c.id],
- row_to_name(row),
- _row_to_password_hash(row),
- row[users.c.email_address],
- [phonenumbers.parse(p) for p in row[users.c.phone_numbers]],
- _row_to_password_recovery_hash(row),
- created_at=row[users.c.created_at]
- )
- return user
- def row_to_name(row):
- first_name = row[users.c.first_name]
- tussenvoegsel = row[users.c.tussenvoegsel]
- last_name = row[users.c.last_name]
- if last_name is None and tussenvoegsel is None:
- return Name(first_name, None)
- else:
- return Name(first_name, (tussenvoegsel, last_name))
- def backup_user(backup_dir, user):
- backup_path = backup_dir / '{}.{}'.format(user.id, pinn.__version__)
- with backup_path.open('wb') as backup_file:
- pickle.dump(user, backup_file)
- def _format_phone_numbers(phone_numbers):
- return [phonenumbers.format_number(p, PhoneNumberFormat.INTERNATIONAL)
- for p in phone_numbers]
- def insert_user(db, user):
- _check_duplicate_email_address(db, user)
- formatted_phone_numbers = _format_phone_numbers(user.phone_numbers)
- first_name, tussenvoegsel, last_name = _get_name(user)
- query = users.insert().values(
- first_name=first_name,
- tussenvoegsel=tussenvoegsel,
- last_name=last_name,
- password_hash=user.password_hash.hash,
- password_salt=user.password_hash.salt,
- password_rounds=user.password_hash.rounds,
- email_address=user.email_address,
- phone_numbers=formatted_phone_numbers,
- password_recovery_hash=user.password_recovery_hash.hash if not user.password_recovery_hash is None else None,
- password_recovery_salt=user.password_recovery_hash.salt if not user.password_recovery_hash is None else None,
- password_recovery_rounds=user.password_recovery_hash.rounds if not user.password_recovery_hash is None else None,
- created_at=user.created_at,
- active=True,
- )
- return db.execute(query).inserted_primary_key[0]
- def update_user(db, user):
- _check_duplicate_email_address(db, user)
- formatted_phone_numbers = _format_phone_numbers(user.phone_numbers)
- first_name, tussenvoegsel, last_name = _get_name(user)
- query = users.update().values(
- first_name=first_name,
- tussenvoegsel=tussenvoegsel,
- last_name=last_name,
- password_hash=user.password_hash.hash,
- password_salt=user.password_hash.salt,
- password_rounds=user.password_hash.rounds,
- email_address=user.email_address,
- phone_numbers=formatted_phone_numbers,
- password_recovery_hash=user.password_recovery_hash.hash if not user.password_recovery_hash is None else None,
- password_recovery_salt=user.password_recovery_hash.salt if not user.password_recovery_hash is None else None,
- password_recovery_rounds=user.password_recovery_hash.rounds if not user.password_recovery_hash is None else None,
- ).where(users.c.id == user.id)
- db.execute(query)
- def _get_name(user):
- first_name = user.name.first_name
- if user.name.last_name is None:
- tussenvoegsel = None
- last_name = None
- else:
- tussenvoegsel = user.name.last_name[0]
- last_name = user.name.last_name[1]
- return first_name, tussenvoegsel, last_name
- def fetch_users(db):
- query = select([users])
- return list(map(_row_to_user, db.execute(query)))
- def fetch_user_by_id(db, id):
- query = select([users]).where(users.c.id == id)
- row = db.execute(query).first()
- return row and _row_to_user(row)
- def fetch_user_by_email_address(db, email_address):
- query = select([users]).where(users.c.email_address == email_address)
- row = db.execute(query).first()
- return row and _row_to_user(row)
- def delete_user_with_id(db, backup_dir, id):
- user = fetch_user_by_id(db, id)
- if user is not None:
- backup_user(backup_dir, user)
- query = users.delete().where(users.c.id == id)
- db.execute(query)
- def fetch_user_by_password_recovery_token(db, token):
- for user in fetch_users(db):
- password_recovery_hash = user.password_recovery_hash
- if not password_recovery_hash is None:
- if password_recovery_hash.matches_password(token):
- return user
- def fetch_all_roles(db):
- query = select([roles])
- return {_row_to_role(row) for row in db.execute(query)}
- def add_role_to_user(db, user_id, role_id):
- query = user_roles.insert().values(user_id=user_id, role_id=role_id)
- db.execute(query)
- def remove_role_from_user(db, user_id, role_id):
- query = (user_roles.delete()
- .where(user_roles.c.user_id == user_id)
- .where(user_roles.c.role_id == role_id))
- db.execute(query)
- def _row_to_role(row):
- return Role(row[roles.c.id], row[roles.c.name])
- def fetch_user_roles(db, user_id):
- query = (select([user_roles.join(roles)])
- .where(user_roles.c.user_id == user_id))
- return {_row_to_role(row) for row in db.execute(query)}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement