Advertisement
Guest User

models.py

a guest
Aug 24th, 2016
123
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 13.83 KB | None | 0 0
  1. from datetime import datetime
  2. import hashlib
  3. from werkzeug.security import generate_password_hash, check_password_hash
  4. from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
  5. from markdown import markdown
  6. import bleach
  7. from flask import current_app, request, url_for
  8. from flask_login import UserMixin, AnonymousUserMixin
  9. from app.exceptions import ValidationError
  10. from . import db, login_manager
  11.  
  12.  
  13. class Permission:
  14. FOLLOW = 0x01
  15. COMMENT = 0x02
  16. WRITE_ARTICLES = 0x04
  17. MODERATE_COMMENTS = 0x08
  18. ADMINISTER = 0x80
  19.  
  20.  
  21. class Role(db.Model):
  22. __tablename__ = 'roles'
  23. id = db.Column(db.Integer, primary_key=True)
  24. name = db.Column(db.String(64), unique=True)
  25. default = db.Column(db.Boolean, default=False, index=True)
  26. permissions = db.Column(db.Integer)
  27. users = db.relationship('User', backref='role', lazy='dynamic')
  28.  
  29. @staticmethod
  30. def insert_roles():
  31. roles = {
  32. 'User': (Permission.FOLLOW |
  33. Permission.COMMENT |
  34. Permission.WRITE_ARTICLES, True),
  35. 'Moderator': (Permission.FOLLOW |
  36. Permission.COMMENT |
  37. Permission.WRITE_ARTICLES |
  38. Permission.MODERATE_COMMENTS, False),
  39. 'Administrator': (0xff, False)
  40. }
  41.  
  42. for r in roles:
  43. role = Role.query.filter_by(name=r).first()
  44. if role is None:
  45. role = Role(name=r)
  46. role.permissions = roles[r][0]
  47. role.default = roles[r][1]
  48. db.session.add(role)
  49. db.session.commit()
  50.  
  51. def __repr__(self):
  52. return '<Role %r>' % self.name
  53.  
  54.  
  55. class Follow(db.Model):
  56. __tablename__ = 'follows'
  57. follower_id = db.Column(db.Integer, db.ForeignKey('users.id'),
  58. primary_key=True)
  59. followed_id = db.Column(db.Integer, db.ForeignKey('users.id'),
  60. primary_key=True)
  61. timestamp = db.Column(db.DateTime, default=datetime.utcnow)
  62.  
  63.  
  64. class User(UserMixin, db.Model):
  65. __tablename__ = 'users'
  66. id = db.Column(db.Integer, primary_key=True)
  67. email = db.Column(db.String(64), unique=True, index=True)
  68. username = db.Column(db.String(64), unique=True, index=True)
  69. role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
  70. password_hash = db.Column(db.String(128))
  71. confirmed = db.Column(db.Boolean, default=False)
  72. name = db.Column(db.String(64))
  73. location = db.Column(db.String(64))
  74. about_me = db.Column(db.Text())
  75. member_since = db.Column(db.DateTime(), default=datetime.utcnow)
  76. last_seen = db.Column(db.DateTime(), default=datetime.utcnow)
  77. avatar_hash = db.Column(db.String(32))
  78. posts = db.relationship('Post', backref='author', lazy='dynamic')
  79. followed = db.relationship('Follow',
  80. foreign_keys=[Follow.follower_id],
  81. backref=db.backref('follower', lazy='joined'),
  82. lazy='dynamic',
  83. cascade='all, delete-orphan')
  84. followers = db.relationship('Follow',
  85. foreign_keys=[Follow.followed_id],
  86. backref=db.backref('followed', lazy='joined'),
  87. lazy='dynamic',
  88. cascade='all, delete-orphan')
  89. comments = db.relationship('Comment', backref='author', lazy='dynamic')
  90.  
  91.  
  92. @staticmethod
  93. def generate_fake(count=100):
  94. from sqlalchemy.exc import IntegrityError
  95. from random import seed
  96. import forgery_py
  97.  
  98. seed()
  99. for i in range(count):
  100. u = User(email=forgery_py.internet.email_address(),
  101. username=forgery_py.internet.user_name(True),
  102. password=forgery_py.lorem_ipsum.word(),
  103. confirmed=True,
  104. name=forgery_py.name.full_name(),
  105. location=forgery_py.address.city(),
  106. about_me=forgery_py.lorem_ipsum.sentence(),
  107. member_since=forgery_py.date.date(True))
  108. db.session.add(u)
  109. try:
  110. db.session.commit()
  111. except IntegrityError:
  112. db.session.rollback()
  113.  
  114. @staticmethod
  115. def add_self_follows():
  116. for user in User.query.all():
  117. if not user.is_following(user):
  118. user.follow(user)
  119. db.session.add(user)
  120. db.session.commit()
  121.  
  122. def __init__(self, **kwargs):
  123. super(User, self).__init__(**kwargs)
  124. if self.role is None:
  125. if self.email == current_app.config['PROJECT_ADMIN']:
  126. self.role = Role.query.filter_by(permissions=0xff).first()
  127. if self.role is None:
  128. self.role = Role.query.filter_by(default=True).first()
  129. if self.email is not None and self.avatar_hash is None:
  130. self.avatar_hash = hashlib.md5(
  131. self.email.encode('utf-8')).hexdigest()
  132. self.followed.append(Follow(followed=self))
  133.  
  134. @property
  135. def password(self):
  136. raise AttributeError('password is not a readable attribute')
  137.  
  138. @password.setter
  139. def password(self, password):
  140. self.password_hash = generate_password_hash(password)
  141.  
  142. def verify_password(self, password):
  143. return check_password_hash(self.password_hash, password)
  144.  
  145. def generate_confirmation_token(self, expiration=3600):
  146. s = Serializer(current_app.config['SECRET_KEY'], expiration)
  147. return s.dumps({'confirm': self.id})
  148.  
  149. def confirm(self, token):
  150. s = Serializer(current_app.config['SECRET_KEY'])
  151. try:
  152. data = s.loads(token)
  153. except:
  154. return False
  155. if data.get('confirm') != self.id:
  156. return False
  157. self.confirmed = True
  158. db.session.add(self)
  159. return True
  160.  
  161. def generate_reset_token(self, expiration=3600):
  162. s = Serializer(current_app.config['SECRET_KEY'], expiration)
  163. return s.dumps({'reset': self.id})
  164.  
  165. def reset_password(self, token, new_password):
  166. s = Serializer(current_app.config['SECRET_KEY'])
  167. try:
  168. data = s.loads(token)
  169. except:
  170. return False
  171. if data.get('reset') != self.id:
  172. return False
  173. self.password = new_password
  174. db.session.add(self)
  175. return True
  176.  
  177. def generate_email_change_token(self, new_email, expiration=3600):
  178. s = Serializer(current_app.config['SECRET_KEY'], expiration)
  179. return s.dumps({'change_email': self.id, 'new_email': new_email})
  180.  
  181. def change_email(self, token):
  182. s = Serializer(current_app.config['SECRET_KEY'])
  183. try:
  184. data = s.loads(token)
  185. except:
  186. return False
  187. if data.get('change_email') != self.id:
  188. return False
  189. new_email = data.get('new_email')
  190. if new_email is None:
  191. return False
  192. if self.query.filter_by(email=new_email).first() is not None:
  193. return False
  194. self.email = new_email
  195. self.avatar_hash = hashlib.md5(
  196. self.email.encode('utf-8')).hexdigest()
  197. db.session.add(self)
  198. return True
  199.  
  200. def can(self, permissions):
  201. return self.role is not None and \
  202. (self.role.permissions & permissions) == permissions
  203.  
  204. def is_administrator(self):
  205. return self.can(Permission.ADMINISTER)
  206.  
  207. def ping(self):
  208. self.last_seen = datetime.utcnow()
  209. db.session.add(self)
  210.  
  211. def gravatar(self, size=100, default='identicon', rating='g'):
  212. if request.is_secure:
  213. url = 'https://secure.gravatar.com/avatar'
  214. else:
  215. url = 'http://www.gravatar.com/avatar'
  216. hash = self.avatar_hash or hashlib.md5(
  217. self.email.encode('utf-8')).hexdigest()
  218. return '{url}/{hash}?s={size}&d={default}&r={rating}'.format(
  219. url=url, hash=hash, size=size, default=default, rating=rating)
  220.  
  221. def follow(self, user):
  222. if not self.is_following(user):
  223. f = Follow(follower=self, followed=user)
  224. db.session.add(f)
  225.  
  226. def unfollow(self, user):
  227. f = self.followed.filter_by(followed_id=user.id).first()
  228. if f:
  229. db.session.delete(f)
  230.  
  231. def is_following(self, user):
  232. return self.followed.filter_by(
  233. followed_id=user.id).first() is not None
  234.  
  235. def is_followed_by(self, user):
  236. return self.followers.filter_by(
  237. follower_id=user.id).first() is not None
  238.  
  239. @property
  240. def followed_posts(self):
  241. return Post.query.join(Follow, Follow.followed_id == Post.author_id)\
  242. .filter(Follow.follower_id == self.id)
  243.  
  244. def to_json(self):
  245. json_user = {
  246. 'url': url_for('api.get_user', id=self.id, _external=True),
  247. 'username': self.username,
  248. 'member_since': self.member_since,
  249. 'last_seen': self.last_seen,
  250. 'posts': url_for('api.get_user_posts', id=self.id, _external=True),
  251. 'followed_posts': url_for('api.get_user_followed_posts',
  252. id=self.id, _external=True),
  253. 'post_count': self.posts.count()
  254. }
  255. return json_user
  256.  
  257. def generate_auth_token(self, expiration):
  258. s = Serializer(current_app.config['SECRET_KEY'],
  259. expires_in=expiration)
  260. return s.dumps({'id': self.id}).decode('ascii')
  261.  
  262. @staticmethod
  263. def verify_auth_token(token):
  264. s = Serializer(current_app.config['SECRET_KEY'])
  265. try:
  266. data = s.loads(token)
  267. except:
  268. return None
  269. return User.query.get(data['id'])
  270.  
  271. def __repr__(self):
  272. return '<User %r>' % self.username
  273.  
  274.  
  275. class AnonymousUser(AnonymousUserMixin):
  276. def can(self, permissions):
  277. return False
  278.  
  279. def is_administrator(self):
  280. return False
  281.  
  282. login_manager.anonymous_user = AnonymousUser
  283.  
  284.  
  285. @login_manager.user_loader
  286. def load_user(user_id):
  287. return User.query.get(int(user_id))
  288.  
  289.  
  290. class Post(db.Model):
  291. __tablename__ = 'posts'
  292. id = db.Column(db.Integer, primary_key=True)
  293. body = db.Column(db.Text)
  294. body_html = db.Column(db.Text)
  295. timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
  296. author_id = db.Column(db.Integer, db.ForeignKey('users.id'))
  297. comments = db.relationship('Comment', backref='post', lazy='dynamic')
  298.  
  299. @staticmethod
  300. def generate_fake(count=100):
  301. from random import seed, randint
  302. import forgery_py
  303.  
  304. seed()
  305. user_count = User.query.count()
  306. for i in range(count):
  307. u = User.query.offset(randint(0, user_count - 1)).first()
  308. p = Post(body=forgery_py.lorem_ipsum.sentences(randint(1, 5)),
  309. timestamp=forgery_py.date.date(True),
  310. author=u)
  311. db.session.add(p)
  312. db.session.commit()
  313.  
  314. @staticmethod
  315. def on_changed_body(target, value, oldvalue, initiator):
  316. allowed_tags = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code',
  317. 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul',
  318. 'h1', 'h2', 'h3', 'p']
  319. target.body_html = bleach.linkify(bleach.clean(
  320. markdown(value, output_format='html'),
  321. tags=allowed_tags, strip=True))
  322.  
  323. def to_json(self):
  324. json_post = {
  325. 'url': url_for('api.get_post', id=self.id, _external=True),
  326. 'body': self.body,
  327. 'body_html': self.body_html,
  328. 'timestamp': self.timestamp,
  329. 'author': url_for('api.get_user', id=self.author_id,
  330. _external=True),
  331. 'comments': url_for('api.get_post_comments', id=self.id,
  332. _external=True),
  333. 'comment_count': self.comments.count()
  334. }
  335. return json_post
  336.  
  337. @staticmethod
  338. def from_json(json_post):
  339. body = json_post.get('body')
  340. if body is None or body == '':
  341. raise ValidationError('post does not have a body')
  342. return Post(body=body)
  343.  
  344.  
  345. db.event.listen(Post.body, 'set', Post.on_changed_body)
  346.  
  347.  
  348. class Comment(db.Model):
  349. __tablename__ = 'comments'
  350. id = db.Column(db.Integer, primary_key=True)
  351. body = db.Column(db.Text)
  352. body_html = db.Column(db.Text)
  353. timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
  354. disabled = db.Column(db.Boolean)
  355. author_id = db.Column(db.Integer, db.ForeignKey('users.id'))
  356. post_id = db.Column(db.Integer, db.ForeignKey('posts.id'))
  357.  
  358. @staticmethod
  359. def on_changed_body(target, value, oldvalue, initiator):
  360. allowed_tags = ['a', 'abbr', 'acronym', 'b', 'code', 'em', 'i',
  361. 'strong']
  362. target.body_html = bleach.linkify(bleach.clean(
  363. markdown(value, output_format='html'),
  364. tags=allowed_tags, strip=True))
  365.  
  366. def to_json(self):
  367. json_comment = {
  368. 'url': url_for('api.get_comment', id=self.id, _external=True),
  369. 'post': url_for('api.get_post', id=self.post_id, _external=True),
  370. 'body': self.body,
  371. 'body_html': self.body_html,
  372. 'timestamp': self.timestamp,
  373. 'author': url_for('api.get_user', id=self.author_id,
  374. _external=True),
  375. }
  376. return json_comment
  377.  
  378. @staticmethod
  379. def from_json(json_comment):
  380. body = json_comment.get('body')
  381. if body is None or body == '':
  382. raise ValidationError('comment does not have a body')
  383. return Comment(body=body)
  384.  
  385.  
  386. db.event.listen(Comment.body, 'set', Comment.on_changed_body)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement