Guest User

Untitled

a guest
Mar 6th, 2018
90
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 10.26 KB | None | 0 0
  1. # -*- coding: utf-8 -*-
  2.  
  3. from PIL import Image
  4.  
  5. import os
  6.  
  7. from flask import (Blueprint, render_template, request, url_for, redirect, g,
  8. current_app, flash, abort)
  9. from flask_babel import gettext
  10. from sqlalchemy.exc import IntegrityError
  11. from sqlalchemy.orm.exc import NoResultFound
  12.  
  13. from db import db
  14. from models import Journalist, InvalidUsernameException, PasswordError
  15. from journalist_app.decorators import admin_required
  16. from journalist_app.utils import (make_password, commit_account_changes,
  17. set_diceware_password)
  18. from journalist_app.forms import LogoForm, NewUserForm
  19.  
  20.  
  21. def make_blueprint(config):
  22. view = Blueprint('admin', __name__)
  23.  
  24. @view.route('/', methods=('GET', 'POST'))
  25. @admin_required
  26. def index():
  27. users = Journalist.query.all()
  28. return render_template("admin.html", users=users)
  29.  
  30. @view.route('/config', methods=('GET', 'POST'))
  31. @admin_required
  32. def manage_config():
  33. form = LogoForm()
  34. if form.validate_on_submit():
  35. f = form.logo.data
  36. static_filepath = os.path.join(config.SECUREDROP_ROOT,
  37. "static/i/logo.png")
  38. temp_static_filepath = os.path.join(config.SECUREDROP_ROOT,
  39. "static/i/temp_logo.png")
  40. try:
  41. with Image.open(f) as im:
  42. imcopy = im.copy()
  43. imcopy.thumbnail((500, 450), resample=3)
  44. imcopy.save(static_filepath, "PNG")
  45. flash(gettext("Image updated."), "logo-success")
  46.  
  47. except Exception:
  48. flash("Unable to process the image file."
  49. " Try another one.", "logo-error")
  50.  
  51. finally:
  52. return redirect(url_for("admin.manage_config"))
  53.  
  54. else:
  55. for field, errors in form.errors.items():
  56. for error in errors:
  57. flash(error, "logo-error")
  58. return render_template("config.html", form=form)
  59.  
  60. @view.route('/add', methods=('GET', 'POST'))
  61. @admin_required
  62. def add_user():
  63. form = NewUserForm()
  64. if form.validate_on_submit():
  65. form_valid = True
  66. username = request.form['username']
  67. password = request.form['password']
  68. is_admin = bool(request.form.get('is_admin'))
  69.  
  70. try:
  71. otp_secret = None
  72. if request.form.get('is_hotp', False):
  73. otp_secret = request.form.get('otp_secret', '')
  74. new_user = Journalist(username=username,
  75. password=password,
  76. is_admin=is_admin,
  77. otp_secret=otp_secret)
  78. db.session.add(new_user)
  79. db.session.commit()
  80. except PasswordError:
  81. flash(gettext(
  82. 'There was an error with the autogenerated password. '
  83. 'User not created. Please try again.'), 'error')
  84. form_valid = False
  85. except InvalidUsernameException as e:
  86. form_valid = False
  87. flash('Invalid username: ' + str(e), "error")
  88. except IntegrityError as e:
  89. db.session.rollback()
  90. form_valid = False
  91. if "UNIQUE constraint failed: journalists.username" in str(e):
  92. flash(gettext("That username is already in use"),
  93. "error")
  94. else:
  95. flash(gettext("An error occurred saving this user"
  96. " to the database."
  97. " Please inform your administrator."),
  98. "error")
  99. current_app.logger.error("Adding user "
  100. "'{}' failed: {}".format(
  101. username, e))
  102.  
  103. if form_valid:
  104. return redirect(url_for('admin.new_user_two_factor',
  105. uid=new_user.id))
  106.  
  107. return render_template("admin_add_user.html",
  108. password=make_password(config),
  109. form=form)
  110.  
  111. @view.route('/2fa', methods=('GET', 'POST'))
  112. @admin_required
  113. def new_user_two_factor():
  114. user = Journalist.query.get(request.args['uid'])
  115.  
  116. if request.method == 'POST':
  117. token = request.form['token']
  118. if user.verify_token(token):
  119. flash(gettext(
  120. "Token in two-factor authentication "
  121. "accepted for user {user}.").format(
  122. user=user.username),
  123. "notification")
  124. return redirect(url_for("admin.index"))
  125. else:
  126. flash(gettext(
  127. "Could not verify token in two-factor authentication."),
  128. "error")
  129.  
  130. return render_template("admin_new_user_two_factor.html", user=user)
  131.  
  132. @view.route('/reset-2fa-totp', methods=['POST'])
  133. @admin_required
  134. def reset_two_factor_totp():
  135. uid = request.form['uid']
  136. user = Journalist.query.get(uid)
  137. user.is_totp = True
  138. user.regenerate_totp_shared_secret()
  139. db.session.commit()
  140. return redirect(url_for('admin.new_user_two_factor', uid=uid))
  141.  
  142. @view.route('/reset-2fa-hotp', methods=['POST'])
  143. @admin_required
  144. def reset_two_factor_hotp():
  145. uid = request.form['uid']
  146. otp_secret = request.form.get('otp_secret', None)
  147. if otp_secret:
  148. user = Journalist.query.get(uid)
  149. try:
  150. user.set_hotp_secret(otp_secret)
  151. except TypeError as e:
  152. if "Non-hexadecimal digit found" in str(e):
  153. flash(gettext(
  154. "Invalid secret format: "
  155. "please only submit letters A-F and numbers 0-9."),
  156. "error")
  157. elif "Odd-length string" in str(e):
  158. flash(gettext(
  159. "Invalid secret format: "
  160. "odd-length secret. Did you mistype the secret?"),
  161. "error")
  162. else:
  163. flash(gettext(
  164. "An unexpected error occurred! "
  165. "Please inform your administrator."), "error")
  166. current_app.logger.error(
  167. "set_hotp_secret '{}' (id {}) failed: {}".format(
  168. otp_secret, uid, e))
  169. return render_template('admin_edit_hotp_secret.html', uid=uid)
  170. else:
  171. db.session.commit()
  172. return redirect(url_for('admin.new_user_two_factor', uid=uid))
  173. else:
  174. return render_template('admin_edit_hotp_secret.html', uid=uid)
  175.  
  176. @view.route('/edit/<int:user_id>', methods=('GET', 'POST'))
  177. @admin_required
  178. def edit_user(user_id):
  179. user = Journalist.query.get(user_id)
  180.  
  181. if request.method == 'POST':
  182. if request.form.get('username', None):
  183. new_username = request.form['username']
  184.  
  185. try:
  186. Journalist.check_username_acceptable(new_username)
  187. except InvalidUsernameException as e:
  188. flash('Invalid username: ' + str(e), 'error')
  189. return redirect(url_for("admin.edit_user",
  190. user_id=user_id))
  191.  
  192. if new_username == user.username:
  193. pass
  194. elif Journalist.query.filter_by(
  195. username=new_username).one_or_none():
  196. flash(gettext(
  197. 'Username "{user}" already taken.').format(
  198. user=new_username),
  199. "error")
  200. return redirect(url_for("admin.edit_user",
  201. user_id=user_id))
  202. else:
  203. user.username = new_username
  204.  
  205. user.is_admin = bool(request.form.get('is_admin'))
  206.  
  207. commit_account_changes(user)
  208.  
  209. password = make_password(config)
  210. return render_template("edit_account.html", user=user,
  211. password=password)
  212.  
  213. @view.route('/edit/<int:user_id>/new-password', methods=('POST',))
  214. @admin_required
  215. def set_password(user_id):
  216. try:
  217. user = Journalist.query.get(user_id)
  218. except NoResultFound:
  219. abort(404)
  220.  
  221. password = request.form.get('password')
  222. set_diceware_password(user, password)
  223. return redirect(url_for('admin.edit_user', user_id=user_id))
  224.  
  225. @view.route('/delete/<int:user_id>', methods=('POST',))
  226. @admin_required
  227. def delete_user(user_id):
  228. user = Journalist.query.get(user_id)
  229. if user_id == g.user.id:
  230. # Do not flash because the interface already has safe guards.
  231. # It can only happen by manually crafting a POST request
  232. current_app.logger.error(
  233. "Admin {} tried to delete itself".format(g.user.username))
  234. abort(403)
  235. elif user:
  236. db.session.delete(user)
  237. db.session.commit()
  238. flash(gettext("Deleted user '{user}'").format(
  239. user=user.username), "notification")
  240. else:
  241. current_app.logger.error(
  242. "Admin {} tried to delete nonexistent user with pk={}".format(
  243. g.user.username, user_id))
  244. abort(404)
  245.  
  246. return redirect(url_for('admin.index'))
  247.  
  248. @view.route('/edit/<int:user_id>/new-password', methods=('POST',))
  249. @admin_required
  250. def new_password(user_id):
  251. try:
  252. user = Journalist.query.get(user_id)
  253. except NoResultFound:
  254. abort(404)
  255.  
  256. password = request.form.get('password')
  257. set_diceware_password(user, password)
  258. return redirect(url_for('admin.edit_user', user_id=user_id))
  259.  
  260. @view.route('/ossec-test')
  261. @admin_required
  262. def ossec_test():
  263. current_app.logger.error('This is a test OSSEC alert')
  264. flash(gettext('Test alert sent. Check your email.'), 'notification')
  265. return redirect(url_for('admin.manage_config'))
  266.  
  267. return view
Add Comment
Please, Sign In to add comment