SHARE
TWEET

Redis sessions in Nagare (courtesy of expo)

Crumble Aug 28th, 2013 93 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. # -*- coding: utf-8 -*-
  2.  
  3. import redis
  4.  
  5. from nagare import local
  6. from nagare.sessions import ExpirationError, common
  7. from nagare.sessions.serializer import Pickle
  8.  
  9. KEY_PREFIX = 'nagare_%d_'
  10.  
  11.  
  12. class Sessions(common.Sessions):
  13.     """Sessions manager for sessions kept in an external redis server
  14.    """
  15.     spec = common.Sessions.spec.copy()
  16.     spec.update(dict(
  17.         host='string(default="127.0.0.1")',
  18.         port='integer(default=6379)',
  19.         db='integer(default=0)',
  20.         ttl='integer(default=0)',
  21.         lock_ttl='integer(default=0)',
  22.         lock_poll_time='float(default=0.1)',
  23.         reset='boolean(default=True)',
  24.         serializer='string(default="nagare.sessions.serializer:Pickle")'
  25.     ))
  26.  
  27.     def __init__(
  28.         self,
  29.         host='127.0.0.1',
  30.         port=6379,
  31.         db=0,
  32.         ttl=0,
  33.         lock_ttl=5,
  34.         lock_poll_time=0.1,
  35.         reset=False,
  36.         serializer=None,
  37.         **kw
  38.     ):
  39.         """Initialization
  40.  
  41.        In:
  42.          - ``host`` -- address of the memcache server
  43.          - ``port`` -- port of the memcache server
  44.          - ``ttl`` -- sessions and continuations timeout, in seconds (0 = no timeout)
  45.          - ``lock_ttl`` -- session locks timeout, in seconds (0 = no timeout)
  46.          - ``lock_poll_time`` -- wait time between two lock acquisition tries, in seconds
  47.          - ``reset`` -- do a reset of all the sessions on startup ?
  48.          - ``serializer`` -- serializer / deserializer of the states
  49.        """
  50.         super(Sessions, self).__init__(serializer=serializer or Pickle, **kw)
  51.  
  52.         self.host = host
  53.         self.port = port
  54.         self.db = db
  55.         self.ttl = ttl
  56.         self.lock_ttl = lock_ttl
  57.         self.lock_poll_time = lock_poll_time
  58.  
  59.         if reset:
  60.             self.flush_all()
  61.  
  62.     def set_config(self, filename, conf, error):
  63.         """Read the configuration parameters
  64.  
  65.        In:
  66.          - ``filename`` -- the path to the configuration file
  67.          - ``conf`` -- the ``ConfigObj`` object, created from the configuration file
  68.          - ``error`` -- the function to call in case of configuration errors
  69.        """
  70.         # Let's the super class validate the configuration file
  71.         conf = super(Sessions, self).set_config(filename, conf, error)
  72.  
  73.         for arg_name in (
  74.             'host', 'port', 'db', 'ttl', 'lock_ttl',
  75.                             'lock_poll_time',
  76.                             #'lock_max_wait_time',
  77.                             #'min_compress_len', 'debug'
  78.         ):
  79.             setattr(self, arg_name, conf[arg_name])
  80.  
  81.         if conf['reset']:
  82.             self.flush_all()
  83.  
  84.         return conf
  85.  
  86.     def _get_connection(self):
  87.         """Get the connection to the redis server
  88.  
  89.        Return:
  90.          - the connection
  91.        """
  92.         # The connection objects are local to the workers
  93.         connection = getattr(local.worker, 'redis_connection', None)
  94.  
  95.         if connection is None:
  96.             connection = redis.Redis(self.host, self.port, self.db)
  97.             local.worker.redis_connection = connection
  98.  
  99.         return connection
  100.  
  101.     def flush_all(self):
  102.         """Delete all the contents in the redis server
  103.        """
  104.         connection = self._get_connection()
  105.         connection.flushdb()
  106.  
  107.     def get_lock(self, session_id):
  108.         """Retrieve the lock of a session
  109.  
  110.        In:
  111.          - ``session_id`` -- session id
  112.  
  113.        Return:
  114.          - the lock
  115.        """
  116.         connection = self._get_connection()
  117.         lock = connection.lock(
  118.             (KEY_PREFIX + 'lock') % session_id,
  119.             self.lock_ttl,
  120.             self.lock_poll_time)
  121.         return lock
  122.  
  123.     def create(self, session_id, secure_id, lock):
  124.         """Create a new session
  125.  
  126.        In:
  127.          - ``session_id`` -- id of the session
  128.          - ``secure_id`` -- the secure number associated to the session
  129.          - ``lock`` -- the lock of the session
  130.        """
  131.         connection = self._get_connection()
  132.         connection = connection.pipeline()
  133.  
  134.         connection.hmset(
  135.             KEY_PREFIX % session_id, {
  136.                 'state': 0,
  137.                 'sess_id': secure_id,
  138.                 'sess_data': None,
  139.                 '00000': {}
  140.             })
  141.  
  142.         if self.ttl:
  143.             connection.expire(KEY_PREFIX % session_id, self.ttl)
  144.  
  145.         connection.execute()
  146.  
  147.     def delete(self, session_id):
  148.         """Delete the session
  149.  
  150.        In:
  151.          - ``session_id`` -- id of the session to delete
  152.        """
  153.         self._get_connection().delete(KEY_PREFIX % session_id)
  154.  
  155.     def fetch_state(self, session_id, state_id):
  156.         """Retrieve a state with its associated objects graph
  157.  
  158.        In:
  159.          - ``session_id`` -- session id of this state
  160.          - ``state_id`` -- id of this state
  161.  
  162.        Return:
  163.          - id of the latest state
  164.          - secure number associated to the session
  165.          - data kept into the session
  166.          - data kept into the state
  167.        """
  168.         state_id = '%05d' % state_id
  169.  
  170.         connection = self._get_connection()
  171.  
  172.         last_state_id, secure_id, session_data, state_data = connection.hmget(
  173.             KEY_PREFIX % session_id,
  174.             ('state', 'sess_id', 'sess_data', state_id)
  175.         )
  176.  
  177.         if not (secure_id and session_data and last_state_id and state_data):
  178.             raise ExpirationError()
  179.  
  180.         return int(last_state_id), secure_id, session_data, state_data
  181.  
  182.     def store_state(self, session_id, state_id, secure_id, use_same_state, session_data, state_data):
  183.         """Store a state and its associated objects graph
  184.  
  185.        In:
  186.          - ``session_id`` -- session id of this state
  187.          - ``state_id`` -- id of this state
  188.          - ``secure_id`` -- the secure number associated to the session
  189.          - ``use_same_state`` -- is this state to be stored in the previous snapshot?
  190.          - ``session_data`` -- data to keep into the session
  191.          - ``state_data`` -- data to keep into the state
  192.        """
  193.         connection = self._get_connection()
  194.         connection = connection.pipeline(True)
  195.  
  196.         if not use_same_state:
  197.             connection.hincrby(KEY_PREFIX % session_id, 'state', 1)
  198.  
  199.         connection.hmset(KEY_PREFIX % session_id, {
  200.             'sess_id': secure_id,
  201.             'sess_data': session_data,
  202.             '%05d' % state_id: state_data
  203.         })
  204.  
  205.         if self.ttl:
  206.             connection.expire(KEY_PREFIX % session_id, self.ttl)
  207.  
  208.         connection.execute()
RAW Paste Data
Top