Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import binascii
- import base64
- import warnings
- from pyramid.httpexceptions import HTTPFound
- from pyramid.authentication import CallbackAuthenticationPolicy
- from pyramid.authentication import AuthTktCookieHelper
- from pyramid.authentication import b64decode
- from pyramid.security import forget # authenticated_userid,
- from ..models.Authentication import is_user_enabled
- from ..models.Authentication import getGroups
- from ..models.Authentication import _get_userid
- from datetime import datetime
- from time import time
- import pickle
- _marker = object()
- class AuthPolicy(CallbackAuthenticationPolicy):
- def __init__(self,
- secret=None,
- check=None,
- realm='Prepaid',
- cookie_name='auth_tkt',
- secure=False,
- include_ip=False,
- timeout=None,
- reissue_time=None,
- max_age=None,
- path="/",
- http_only=False,
- wild_domain=True,
- debug=False,
- hashalg=_marker,
- parent_domain=False,
- domain=None,):
- if hashalg is _marker:
- hashalg = 'md5'
- warnings.warn(
- 'The MD5 hash function used by default by the '
- 'AuthTktAuthenticationPolicy is known to be '
- 'susceptible to collision attacks. It is the current default '
- 'for backwards compatibility reasons, but we recommend that '
- 'you use the SHA512 algorithm instead for improved security. '
- 'Pass ``hashalg=\'sha512\'`` to the '
- 'AuthTktAuthenticationPolicy constructor to do so.\n\nNote '
- 'that a change to the hash algorithms will invalidate existing '
- 'auth tkt cookies set by your application. If backwards '
- 'compatibility of existing auth tkt cookies is of greater '
- 'concern than the risk posed by the potential for a hash '
- 'collision, you\'ll want to continue using MD5 explicitly. '
- 'To do so, pass ``hashalg=\'md5\'`` in your application to '
- 'the AuthTktAuthenticationPolicy constructor. When you do so '
- 'this warning will not be emitted again. The default '
- 'algorithm used in this policy will change in the future, so '
- 'setting an explicit hashalg will futureproof your '
- 'application.',
- DeprecationWarning,
- stacklevel=2
- )
- self.cookie = AuthTktCookieHelper(
- secret=secret,
- cookie_name=cookie_name,
- secure=secure,
- include_ip=include_ip,
- timeout=timeout,
- reissue_time=reissue_time,
- max_age=max_age,
- http_only=http_only,
- path=path,
- wild_domain=wild_domain,
- hashalg=hashalg,
- parent_domain=parent_domain,
- domain=domain,
- )
- self.check = check
- self.realm = realm
- self.debug = debug
- def unauthenticated_userid(self, request):
- """ The userid parsed from the ``Authorization`` request header
- OR
- The userid key within the auth_tkt cookie."""
- result = self.cookie.identify(request)
- credentials = self._get_credentials(request)
- if credentials:
- return credentials[0]
- if result:
- return result['userid']
- def remember(self, request, userid, **kw):
- """ A no-op. Basic authentication does not provide a protocol for
- remembering the user. Credentials are sent on every request.
- OR
- Accepts the following kw args: ``max_age=<int-seconds>,
- ``tokens=<sequence-of-ascii-strings>``.
- Return a list of headers which will set appropriate cookies on
- the response.
- """
- credentials = self._get_credentials(request)
- if credentials:
- return []
- else:
- return self.cookie.remember(request, userid, **kw)
- def forget(self, request):
- """ Returns challenge headers. This should be attached to a response
- to indicate that credentials are required.
- OR
- A list of headers which will delete appropriate cookies."""
- credentials = self._get_credentials(request)
- if credentials:
- return [('WWW-Authenticate', 'Basic realm="%s"' % self.realm)]
- else:
- return self.cookie.forget(request)
- def callback(self, userid, request):
- # Username arg is ignored. Unfortunately _get_credentials winds up
- # getting called twice when authenticated_userid is called. Avoiding
- # that, however, winds up duplicating logic from the superclass.
- credentials = self._get_credentials(request)
- if credentials:
- username, password = credentials
- base = _get_userid(request.authorization, request)
- name = "pimssess:%s" % base64.b64encode(
- request.remote_addr + ":" + base)
- if not request.redis.exists("pimssess:%s" % base64.b64encode(request.remote_addr + ":" + base)):
- self.set_session(userid, request, refresh=300, name=name)
- return self.check(username, password, request)
- else:
- if userid and not request.session.get("user"):
- request.session[
- "login.message"] = "You have been logged off automatically due to session timeout"
- raise HTTPFound(location='', headers=forget(request))
- if userid and request.session.get("user") and request.session.get("user").username != userid:
- raise HTTPFound(location='', headers=forget(request))
- if userid and not is_user_enabled(userid):
- request.session[
- "login.message"] = "Your Account has been disabled, you have been logged out."
- raise HTTPFound(location='/', headers=forget(request))
- name = "pimssess:%s" % request.session.session_id
- self.set_session(userid, request, name=name)
- return getGroups(userid)
- def set_session(self, userid, request, name="pimssess", refresh=30):
- try:
- store = pickle.loads(request.redis.get(name))
- except:
- store = {
- 'login': datetime.now(), 'userid': userid, "laststamp": datetime.now()}
- if store.get('path') != request.path or time() - store["lasttime"] > refresh:
- store["laststamp"] = datetime.now()
- store["lasttime"] = time()
- store["path"] = request.path
- store["method"] = request.method
- store["ipaddr"] = request.remote_addr
- store["auth.attempts"] = {}
- request.redis.set(name, pickle.dumps(store))
- request.redis.expire(name, request.session.timeout)
- def _get_credentials(self, request):
- authorization = request.headers.get('Authorization')
- if not authorization:
- return None
- try:
- authmeth, auth = authorization.split(' ', 1)
- except ValueError: # not enough values to unpack
- return None
- if authmeth.lower() != 'basic':
- return None
- try:
- authbytes = b64decode(auth.strip())
- except (TypeError, binascii.Error): # can't decode
- return None
- # try utf-8 first, then latin-1; see discussion in
- # https://github.com/Pylons/pyramid/issues/898
- try:
- auth = authbytes.decode('utf-8')
- except UnicodeDecodeError:
- auth = authbytes.decode('latin-1')
- try:
- username, password = auth.split(':', 1)
- except ValueError: # not enough values to unpack
- return None
- return username, password
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement