Advertisement
Guest User

Untitled

a guest
Aug 18th, 2016
282
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 13.83 KB | None | 0 0
  1. from datetime import datetime
  2. import hashlib
  3. from werkzeug.security import generate_password_hash, check_password_hash
  4. from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
  5. from markdown import markdown
  6. import bleach
  7. from flask import current_app, request, url_for
  8. from flask.ext.login import UserMixin, AnonymousUserMixin
  9. from app.exceptions import ValidationError
  10. from . import db, login_manager
  11.  
  12.  
  13. class Permission:
  14. FOLLOW = 0x01
  15. COMMENT = 0x02
  16. WRITE_ARTICLES = 0x04
  17. MODERATE_COMMENTS = 0x08
  18. ADMINISTER = 0x80
  19.  
  20.  
  21. class Role(db.Model):
  22. __tablename__ = 'roles'
  23. id = db.Column(db.Integer, primary_key=True)
  24. name = db.Column(db.String(64), unique=True)
  25. default = db.Column(db.Boolean, default=False, index=True)
  26. permissions = db.Column(db.Integer)
  27. users = db.relationship('User', backref='role', lazy='dynamic')
  28.  
  29. @staticmethod
  30. def insert_roles():
  31. roles = {
  32. 'User': (Permission.FOLLOW |
  33. Permission.COMMENT |
  34. Permission.WRITE_ARTICLES, True),
  35. 'Moderator': (Permission.FOLLOW |
  36. Permission.COMMENT |
  37. Permission.WRITE_ARTICLES |
  38. Permission.MODERATE_COMMENTS, False),
  39. 'Administrator': (0xff, False)
  40. }
  41.  
  42. for r in roles:
  43. role = Role.query.filter_by(name=r).first()
  44. if role is None:
  45. role = Role(name=r)
  46. role.permissions = roles[r][0]
  47. role.default = roles[r][1]
  48. db.session.add(role)
  49. db.session.commit()
  50.  
  51. def __repr__(self):
  52. return '<Role %r>' % self.name
  53.  
  54.  
  55. class Follow(db.Model):
  56. __tablename__ = 'follows'
  57. follower_id = db.Column(db.Integer, db.ForeignKey('users.id'),
  58. primary_key=True)
  59. followed_id = db.Column(db.Integer, db.ForeignKey('users.id'),
  60. primary_key=True)
  61. timestamp = db.Column(db.DateTime, default=datetime.utcnow)
  62.  
  63.  
  64. class User(UserMixin, db.Model):
  65. __tablename__ = 'users'
  66. id = db.Column(db.Integer, primary_key=True)
  67. email = db.Column(db.String(64), unique=True, index=True)
  68. username = db.Column(db.String(64), unique=True, index=True)
  69. role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
  70. password_hash = db.Column(db.String(128))
  71. confirmed = db.Column(db.Boolean, default=False)
  72. name = db.Column(db.String(64))
  73. location = db.Column(db.String(64))
  74. about_me = db.Column(db.Text())
  75. member_since = db.Column(db.DateTime(), default=datetime.utcnow)
  76. last_seen = db.Column(db.DateTime(), default=datetime.utcnow)
  77. avatar_hash = db.Column(db.String(32))
  78. posts = db.relationship('Post', backref='author', lazy='dynamic')
  79. followed = db.relationship('Follow',
  80. foreign_keys=[Follow.follower_id],
  81. backref=db.backref('follower', lazy='joined'),
  82. lazy='dynamic',
  83. cascade='all, delete-orphan')
  84. followers = db.relationship('Follow',
  85. foreign_keys=[Follow.followed_id],
  86. backref=db.backref('followed', lazy='joined'),
  87. lazy='dynamic',
  88. cascade='all, delete-orphan')
  89. comments = db.relationship('Comment', backref='author', lazy='dynamic')
  90.  
  91. @staticmethod
  92. def generate_fake(count=100):
  93. from sqlalchemy.exc import IntegrityError
  94. from random import seed
  95. import forgery_py
  96.  
  97. seed()
  98. for i in range(count):
  99. u = User(email=forgery_py.internet.email_address(),
  100. username=forgery_py.internet.user_name(True),
  101. password=forgery_py.lorem_ipsum.word(),
  102. confirmed=True,
  103. name=forgery_py.name.full_name(),
  104. location=forgery_py.address.city(),
  105. about_me=forgery_py.lorem_ipsum.sentence(),
  106. member_since=forgery_py.date.date(True))
  107. db.session.add(u)
  108. try:
  109. db.session.commit()
  110. except IntegrityError:
  111. db.session.rollback()
  112.  
  113. @staticmethod
  114. def add_self_follows():
  115. for user in User.query.all():
  116. if not user.is_following(user):
  117. user.follow(user)
  118. db.session.add(user)
  119. db.session.commit()
  120.  
  121. def __init__(self, **kwargs):
  122. super(User, self).__init__(**kwargs)
  123. if self.role is None:
  124. if self.email == current_app.config['PROJECT_ADMIN']:
  125. self.role = Role.query.filter_by(permissions=0xff).first()
  126. if self.role is None:
  127. self.role = Role.query.filter_by(default=True).first()
  128. if self.email is not None and self.avatar_hash is None:
  129. self.avatar_hash = hashlib.md5(
  130. self.email.encode('utf-8')).hexdigest()
  131. self.followed.append(Follow(followed=self))
  132.  
  133. @property
  134. def password(self):
  135. raise AttributeError('password is not a readable attribute')
  136.  
  137. @password.setter
  138. def password(self, password):
  139. self.password_hash = generate_password_hash(password)
  140.  
  141. def verify_password(self, password):
  142. return check_password_hash(self.password_hash, password)
  143.  
  144. def generate_confirmation_token(self, expiration=3600):
  145. s = Serializer(current_app.config['SECRET_KEY'], expiration)
  146. return s.dumps({'confirm': self.id})
  147.  
  148. def confirm(self, token):
  149. s = Serializer(current_app.config['SECRET_KEY'])
  150. try:
  151. data = s.loads(token)
  152. except:
  153. return False
  154. if data.get('confirm') != self.id:
  155. return False
  156. self.confirmed = True
  157. db.session.add(self)
  158. return True
  159.  
  160. def generate_reset_token(self, expiration=3600):
  161. s = Serializer(current_app.config['SECRET_KEY'], expiration)
  162. return s.dumps({'reset': self.id})
  163.  
  164. def reset_password(self, token, new_password):
  165. s = Serializer(current_app.config['SECRET_KEY'])
  166. try:
  167. data = s.loads(token)
  168. except:
  169. return False
  170. if data.get('reset') != self.id:
  171. return False
  172. self.password = new_password
  173. db.session.add(self)
  174. return True
  175.  
  176. def generate_email_change_token(self, new_email, expiration=3600):
  177. s = Serializer(current_app.config['SECRET_KEY'], expiration)
  178. return s.dumps({'change_email': self.id, 'new_email': new_email})
  179.  
  180. def change_email(self, token):
  181. s = Serializer(current_app.config['SECRET_KEY'])
  182. try:
  183. data = s.loads(token)
  184. except:
  185. return False
  186. if data.get('change_email') != self.id:
  187. return False
  188. new_email = data.get('new_email')
  189. if new_email is None:
  190. return False
  191. if self.query.filter_by(email=new_email).first() is not None:
  192. return False
  193. self.email = new_email
  194. self.avatar_hash = hashlib.md5(
  195. self.email.encode('utf-8')).hexdigest()
  196. db.session.add(self)
  197. return True
  198.  
  199. def can(self, permissions):
  200. return self.role is not None and \
  201. (self.role.permissions & permissions) == permissions
  202.  
  203. def is_administrator(self):
  204. return self.can(Permission.ADMINISTER)
  205.  
  206. def ping(self):
  207. self.last_seen = datetime.utcnow()
  208. db.session.add(self)
  209.  
  210. def gravatar(self, size=100, default='identicon', rating='g'):
  211. if request.is_secure:
  212. url = 'https://secure.gravatar.com/avatar'
  213. else:
  214. url = 'http://www.gravatar.com/avatar'
  215. hash = self.avatar_hash or hashlib.md5(
  216. self.email.encode('utf-8')).hexdigest()
  217. return '{url}/{hash}?s={size}&d={default}&r={rating}'.format(
  218. url=url, hash=hash, size=size, default=default, rating=rating)
  219.  
  220. def follow(self, user):
  221. if not self.is_following(user):
  222. f = Follow(follower=self, followed=user)
  223. db.session.add(f)
  224.  
  225. def unfollow(self, user):
  226. f = self.followed.filter_by(followed_id=user.id).first()
  227. if f:
  228. db.session.delete(f)
  229.  
  230. def is_following(self, user):
  231. return self.followed.filter_by(
  232. followed_id=user.id).first() is not None
  233.  
  234. def is_followed_by(self, user):
  235. return self.followers.filter_by(
  236. follower_id=user.id).first() is not None
  237.  
  238. @property
  239. def followed_posts(self):
  240. return Post.query.join(Follow, Follow.followed_id == Post.author_id)\
  241. .filter(Follow.follower_id == self.id)
  242.  
  243. def to_json(self):
  244. json_user = {
  245. 'url': url_for('api.get_user', id=self.id, _external=True),
  246. 'username': self.username,
  247. 'member_since': self.member_since,
  248. 'last_seen': self.last_seen,
  249. 'posts': url_for('api.get_user_posts', id=self.id, _external=True),
  250. 'followed_posts': url_for('api.get_user_followed_posts',
  251. id=self.id, _external=True),
  252. 'post_count': self.posts.count()
  253. }
  254. return json_user
  255.  
  256. def generate_auth_token(self, expiration):
  257. s = Serializer(current_app.config['SECRET_KEY'],
  258. expires_in=expiration)
  259. return s.dumps({'id': self.id}).decode('ascii')
  260.  
  261. @staticmethod
  262. def verify_auth_token(token):
  263. s = Serializer(current_app.config['SECRET_KEY'])
  264. try:
  265. data = s.loads(token)
  266. except:
  267. return None
  268. return User.query.get(data['id'])
  269.  
  270. def __repr__(self):
  271. return '<User %r>' % self.username
  272.  
  273.  
  274. class AnonymousUser(AnonymousUserMixin):
  275. def can(self, permissions):
  276. return False
  277.  
  278. def is_administrator(self):
  279. return False
  280.  
  281. login_manager.anonymous_user = AnonymousUser
  282.  
  283.  
  284. @login_manager.user_loader
  285. def load_user(user_id):
  286. return User.query.get(int(user_id))
  287.  
  288.  
  289. class Post(db.Model):
  290. __tablename__ = 'posts'
  291. id = db.Column(db.Integer, primary_key=True)
  292. body = db.Column(db.Text)
  293. body_html = db.Column(db.Text)
  294. timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
  295. author_id = db.Column(db.Integer, db.ForeignKey('users.id'))
  296. comments = db.relationship('Comment', backref='post', lazy='dynamic')
  297.  
  298. @staticmethod
  299. def generate_fake(count=100):
  300. from random import seed, randint
  301. import forgery_py
  302.  
  303. seed()
  304. user_count = User.query.count()
  305. for i in range(count):
  306. u = User.query.offset(randint(0, user_count - 1)).first()
  307. p = Post(body=forgery_py.lorem_ipsum.sentences(randint(1, 5)),
  308. timestamp=forgery_py.date.date(True),
  309. author=u)
  310. db.session.add(p)
  311. db.session.commit()
  312.  
  313. @staticmethod
  314. def on_changed_body(target, value, oldvalue, initiator):
  315. allowed_tags = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code',
  316. 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul',
  317. 'h1', 'h2', 'h3', 'p']
  318. target.body_html = bleach.linkify(bleach.clean(
  319. markdown(value, output_format='html'),
  320. tags=allowed_tags, strip=True))
  321.  
  322. def to_json(self):
  323. json_post = {
  324. 'url': url_for('api.get_post', id=self.id, _external=True),
  325. 'body': self.body,
  326. 'body_html': self.body_html,
  327. 'timestamp': self.timestamp,
  328. 'author': url_for('api.get_user', id=self.author_id,
  329. _external=True),
  330. 'comments': url_for('api.get_post_comments', id=self.id,
  331. _external=True),
  332. 'comment_count': self.comments.count()
  333. }
  334. return json_post
  335.  
  336. @staticmethod
  337. def from_json(json_post):
  338. body = json_post.get('body')
  339. if body is None or body == '':
  340. raise ValidationError('post does not have a body')
  341. return Post(body=body)
  342.  
  343.  
  344. db.event.listen(Post.body, 'set', Post.on_changed_body)
  345.  
  346.  
  347. class Comment(db.Model):
  348. __tablename__ = 'comments'
  349. id = db.Column(db.Integer, primary_key=True)
  350. body = db.Column(db.Text)
  351. body_html = db.Column(db.Text)
  352. timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
  353. disabled = db.Column(db.Boolean)
  354. author_id = db.Column(db.Integer, db.ForeignKey('users.id'))
  355. post_id = db.Column(db.Integer, db.ForeignKey('posts.id'))
  356.  
  357. @staticmethod
  358. def on_changed_body(target, value, oldvalue, initiator):
  359. allowed_tags = ['a', 'abbr', 'acronym', 'b', 'code', 'em', 'i',
  360. 'strong']
  361. target.body_html = bleach.linkify(bleach.clean(
  362. markdown(value, output_format='html'),
  363. tags=allowed_tags, strip=True))
  364.  
  365. def to_json(self):
  366. json_comment = {
  367. 'url': url_for('api.get_comment', id=self.id, _external=True),
  368. 'post': url_for('api.get_post', id=self.post_id, _external=True),
  369. 'body': self.body,
  370. 'body_html': self.body_html,
  371. 'timestamp': self.timestamp,
  372. 'author': url_for('api.get_user', id=self.author_id,
  373. _external=True),
  374. }
  375. return json_comment
  376.  
  377. @staticmethod
  378. def from_json(json_comment):
  379. body = json_comment.get('body')
  380. if body is None or body == '':
  381. raise ValidationError('comment does not have a body')
  382. return Comment(body=body)
  383.  
  384.  
  385. db.event.listen(Comment.body, 'set', Comment.on_changed_body)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement