Advertisement
Guest User

Custom Pyramid Authentication class

a guest
Apr 1st, 2015
474
1
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 8.04 KB | None | 1 0
  1. import binascii
  2. import base64
  3. import warnings
  4.  
  5. from pyramid.httpexceptions import HTTPFound
  6. from pyramid.authentication import CallbackAuthenticationPolicy
  7. from pyramid.authentication import AuthTktCookieHelper
  8. from pyramid.authentication import b64decode
  9.  
  10. from pyramid.security import forget  # authenticated_userid,
  11.  
  12.  
  13. from ..models.Authentication import is_user_enabled
  14. from ..models.Authentication import getGroups
  15. from ..models.Authentication import _get_userid
  16.  
  17.  
  18. from datetime import datetime
  19. from time import time
  20. import pickle
  21.  
  22. _marker = object()
  23.  
  24.  
  25. class AuthPolicy(CallbackAuthenticationPolicy):
  26.  
  27.     def __init__(self,
  28.                  secret=None,
  29.                  check=None,
  30.                  realm='Prepaid',
  31.                  cookie_name='auth_tkt',
  32.                  secure=False,
  33.                  include_ip=False,
  34.                  timeout=None,
  35.                  reissue_time=None,
  36.                  max_age=None,
  37.                  path="/",
  38.                  http_only=False,
  39.                  wild_domain=True,
  40.                  debug=False,
  41.                  hashalg=_marker,
  42.                  parent_domain=False,
  43.                  domain=None,):
  44.  
  45.         if hashalg is _marker:
  46.             hashalg = 'md5'
  47.             warnings.warn(
  48.                 'The MD5 hash function used by default by the '
  49.                 'AuthTktAuthenticationPolicy is known to be '
  50.                 'susceptible to collision attacks.  It is the current default '
  51.                 'for backwards compatibility reasons, but we recommend that '
  52.                 'you use the SHA512 algorithm instead for improved security.  '
  53.                 'Pass ``hashalg=\'sha512\'`` to the '
  54.                 'AuthTktAuthenticationPolicy constructor to do so.\n\nNote '
  55.                 'that a change to the hash algorithms will invalidate existing '
  56.                 'auth tkt cookies set by your application.  If backwards '
  57.                 'compatibility of existing auth tkt cookies is of greater '
  58.                 'concern than the risk posed by the potential for a hash '
  59.                 'collision, you\'ll want to continue using MD5 explicitly.  '
  60.                 'To do so, pass ``hashalg=\'md5\'`` in your application to '
  61.                 'the AuthTktAuthenticationPolicy constructor.   When you do so '
  62.                 'this warning will not be emitted again.  The default '
  63.                 'algorithm used in this policy will change in the future, so '
  64.                 'setting an explicit hashalg will futureproof your '
  65.                 'application.',
  66.                 DeprecationWarning,
  67.                 stacklevel=2
  68.             )
  69.         self.cookie = AuthTktCookieHelper(
  70.             secret=secret,
  71.             cookie_name=cookie_name,
  72.             secure=secure,
  73.             include_ip=include_ip,
  74.             timeout=timeout,
  75.             reissue_time=reissue_time,
  76.             max_age=max_age,
  77.             http_only=http_only,
  78.             path=path,
  79.             wild_domain=wild_domain,
  80.             hashalg=hashalg,
  81.             parent_domain=parent_domain,
  82.             domain=domain,
  83.         )
  84.         self.check = check
  85.         self.realm = realm
  86.         self.debug = debug
  87.  
  88.     def unauthenticated_userid(self, request):
  89.         """ The userid parsed from the ``Authorization`` request header
  90.                            OR
  91.            The userid key within the auth_tkt cookie."""
  92.         result = self.cookie.identify(request)
  93.         credentials = self._get_credentials(request)
  94.         if credentials:
  95.             return credentials[0]
  96.         if result:
  97.             return result['userid']
  98.  
  99.     def remember(self, request, userid, **kw):
  100.         """ A no-op. Basic authentication does not provide a protocol for
  101.        remembering the user. Credentials are sent on every request.
  102.                            OR
  103.        Accepts the following kw args: ``max_age=<int-seconds>,
  104.        ``tokens=<sequence-of-ascii-strings>``.
  105.  
  106.        Return a list of headers which will set appropriate cookies on
  107.        the response.
  108.        """
  109.         credentials = self._get_credentials(request)
  110.         if credentials:
  111.             return []
  112.         else:
  113.             return self.cookie.remember(request, userid, **kw)
  114.  
  115.     def forget(self, request):
  116.         """ Returns challenge headers. This should be attached to a response
  117.        to indicate that credentials are required.
  118.                                OR
  119.        A list of headers which will delete appropriate cookies."""
  120.         credentials = self._get_credentials(request)
  121.         if credentials:
  122.             return [('WWW-Authenticate', 'Basic realm="%s"' % self.realm)]
  123.         else:
  124.             return self.cookie.forget(request)
  125.  
  126.     def callback(self, userid, request):
  127.         # Username arg is ignored.  Unfortunately _get_credentials winds up
  128.         # getting called twice when authenticated_userid is called.  Avoiding
  129.         # that, however, winds up duplicating logic from the superclass.
  130.         credentials = self._get_credentials(request)
  131.         if credentials:
  132.             username, password = credentials
  133.             base = _get_userid(request.authorization, request)
  134.             name = "pimssess:%s" % base64.b64encode(
  135.                 request.remote_addr + ":" + base)
  136.             if not request.redis.exists("pimssess:%s" % base64.b64encode(request.remote_addr + ":" + base)):
  137.                 self.set_session(userid, request, refresh=300, name=name)
  138.             return self.check(username, password, request)
  139.         else:
  140.             if userid and not request.session.get("user"):
  141.                 request.session[
  142.                     "login.message"] = "You have been logged off automatically due to session timeout"
  143.                 raise HTTPFound(location='', headers=forget(request))
  144.             if userid and request.session.get("user") and request.session.get("user").username != userid:
  145.                 raise HTTPFound(location='', headers=forget(request))
  146.             if userid and not is_user_enabled(userid):
  147.                 request.session[
  148.                     "login.message"] = "Your Account has been disabled, you have been logged out."
  149.                 raise HTTPFound(location='/', headers=forget(request))
  150.             name = "pimssess:%s" % request.session.session_id
  151.             self.set_session(userid, request, name=name)
  152.             return getGroups(userid)
  153.  
  154.     def set_session(self, userid, request, name="pimssess", refresh=30):
  155.         try:
  156.             store = pickle.loads(request.redis.get(name))
  157.         except:
  158.             store = {
  159.                 'login': datetime.now(), 'userid': userid, "laststamp": datetime.now()}
  160.         if store.get('path') != request.path or time() - store["lasttime"] > refresh:
  161.             store["laststamp"] = datetime.now()
  162.             store["lasttime"] = time()
  163.             store["path"] = request.path
  164.             store["method"] = request.method
  165.             store["ipaddr"] = request.remote_addr
  166.             store["auth.attempts"] = {}
  167.             request.redis.set(name, pickle.dumps(store))
  168.             request.redis.expire(name, request.session.timeout)
  169.  
  170.     def _get_credentials(self, request):
  171.         authorization = request.headers.get('Authorization')
  172.         if not authorization:
  173.             return None
  174.         try:
  175.             authmeth, auth = authorization.split(' ', 1)
  176.         except ValueError:  # not enough values to unpack
  177.             return None
  178.         if authmeth.lower() != 'basic':
  179.             return None
  180.  
  181.         try:
  182.             authbytes = b64decode(auth.strip())
  183.         except (TypeError, binascii.Error):  # can't decode
  184.             return None
  185.  
  186.         # try utf-8 first, then latin-1; see discussion in
  187.         # https://github.com/Pylons/pyramid/issues/898
  188.         try:
  189.             auth = authbytes.decode('utf-8')
  190.         except UnicodeDecodeError:
  191.             auth = authbytes.decode('latin-1')
  192.  
  193.         try:
  194.             username, password = auth.split(':', 1)
  195.         except ValueError:  # not enough values to unpack
  196.             return None
  197.         return username, password
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement