Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ## -*- coding: utf-8 -*-
- ############################################
- ## File : auth.py
- ## Author : Sylvain Viollon
- ## Email : sylvain@infrae.com
- ## Creation Date : Fri May 16 10:28:49 2008 CEST
- ## Last modification : Mon Jul 14 13:30:39 2008 CEST
- ############################################
- __author__ ="sylvain@infrae.com"
- __format__ ="plaintext"
- __version__ ="$Id$"
- from authkit.authenticate import middleware as AuthenticateMiddleware
- from authkit.authorize import middleware as AuthorizeMiddleware
- from authkit.permissions import RemoteUser
- from watlabws.odbc import ODBCConnectionPool
- import csv
- class FileAuthChecker(object):
- """Check the authentication using a CSV file.
- """
- def __init__(self, filename):
- self.filename = filename
- def __call__(self, environ, user, password):
- csv_file = csv.reader(open(self.filename))
- for csv_user, csv_password in csv_file:
- if (csv_user == user) and (csv_password == password):
- return True
- return False
- SQL_REQUEST = """select %(login_column)s as username, %(password_column)s as password
- from %(user_table)s
- where %(login_column)s = '%%s'"""
- class ODBCAuthChecker(object):
- """Check the authentication using a ODBC connection.
- """
- def __init__(self, dsn, user_table, login_column, password_column,
- number_connection=1, max_request=1000):
- sql_args = dict(login_column=login_column,
- password_column=password_column,
- user_table=user_table)
- self.sql_request = SQL_REQUEST % sql_args
- self.connection = ODBCConnectionPool(dsn, number_connection, max_request)
- def __call__(self, environ, user, password):
- entries = list(self.connection.execute(self.sql_request % user))
- if len(entries) != 1:
- return False
- if (entries[0]['username'] == user and
- entries[0]['password'] == password):
- return True
- return False
- def authorize_filter(global_conf):
- """Create a middleware to authorize.
- """
- def filter(app):
- return AuthorizeMiddleware(app, RemoteUser())
- return filter
- def buildAuthenticationMiddleware(checkAuth, conf):
- """Build a middleware checking the authentication using the given
- auth checker.
- """
- def filter(app):
- return AuthenticateMiddleware(app,
- setup_method=conf.get('method', 'basic'),
- basic_realm=conf.get('realm', 'Auth Realm'),
- basic_authenticate_function=checkAuth)
- return filter
- def authenticate_text_filter(global_conf, passfile, **conf):
- """Create a middleware for authentication using a CSV file.
- """
- checkAuth = FileAuthChecker(passfile)
- return buildAuthenticationMiddleware(checkAuth, conf)
- def authenticate_odbc_filter(global_conf, dsn, user_table, login_column,
- password_column, number_connection=1, max_request=1000,
- **conf):
- """Create a middleware for authentication using an mxODBC connection.
- """
- checkAuth = ODBCAuthChecker(dsn, user_table, login_column, password_column,
- number_connection, max_request)
- return buildAuthenticationMiddleware(checkAuth, conf)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement