Advertisement
Guest User

Untitled

a guest
Jul 17th, 2016
123
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.96 KB | None | 0 0
  1. # -*- coding: utf-8 -*-
  2. import aioredis
  3. import asyncio
  4. import logging
  5.  
  6.  
  7. class SessionWatcher:
  8.     def __init__(self, app, redis_conf, session_ttl, system_db, cache_dict, session_dict):
  9.         self._app = app
  10.         self._session_ttl = session_ttl
  11.  
  12.         self._cache_dict = cache_dict
  13.         self._session_dict = session_dict
  14.         self._system_db = system_db
  15.  
  16.         self._redis_conf = redis_conf
  17.         self._redis_pool = None
  18.  
  19.         self._listen_connection = None
  20.  
  21.     @asyncio.coroutine
  22.     def connect(self):
  23.         address = (self._redis_conf['host'], self._redis_conf['port'])
  24.         self._redis_pool = yield from aioredis.create_pool(
  25.             address,
  26.             db=self._redis_conf['db'],
  27.             password=self._redis_conf.get('password'),
  28.             minsize=self._redis_conf['minsize'],
  29.             maxsize=self._redis_conf['maxsize'],
  30.             encoding=self._redis_conf.get('encoding', 'utf-8'),
  31.             loop=self._app.loop
  32.         )
  33.  
  34.         self._listen_connection = yield from self._redis_pool.acquire()
  35.  
  36.         with (yield from self._redis_pool) as redis_connection:  # type: aioredis.Redis
  37.             res = yield from redis_connection.ping()
  38.             logging.info("SessionWatcher redis ping answer is {}".format(res))
  39.  
  40.             # TODO: right now we set config parameters for pubsub notifications
  41.             # TODO: but if we are sure that at deployment parameters will be others
  42.             # TODO: it is necessary to fix this code
  43.             # for more details see here: http://redis.io/topics/notifications
  44.             yield from redis_connection.config_set('notify-keyspace-events', 'Ex')
  45.  
  46.     @asyncio.coroutine
  47.     def stop(self):
  48.         if self._listen_connection:
  49.             yield from self._listen_connection.unsubscribe(self._build_channel_name())
  50.             self._redis_pool.release(self._listen_connection)
  51.             self._listen_connection = None
  52.  
  53.         if self._cache_dict:
  54.             with (yield from self._redis_pool) as redis_connection:  # type: aioredis.Redis
  55.                 yield from redis_connection.delete(*self._cache_dict.keys())
  56.  
  57.         yield from self._redis_pool.clear()
  58.         self._app = None
  59.  
  60.     @asyncio.coroutine
  61.     def start_watch(self):
  62.         # the following two lines are needed when service was interrupted abnormally - flush invalid sessions
  63.         with (yield from self._redis_pool) as redis_connection:  # type: aioredis.Redis
  64.             yield from redis_connection.flushdb()
  65.  
  66.         channel, = yield from self._listen_connection.subscribe(self._build_channel_name())
  67.         asyncio.async(self._listen_channel(channel), loop=self._app.loop)
  68.  
  69.     @asyncio.coroutine
  70.     def prolong_session(self, session_id, ttl=None):
  71.         with (yield from self._redis_pool) as redis_connection:  # type: aioredis.Redis
  72.             if ttl is None:
  73.                 ttl = self._session_ttl
  74.             yield from redis_connection.set(session_id, 0, expire=ttl)
  75.  
  76.     @asyncio.coroutine
  77.     def _listen_channel(self, channel):
  78.         while (yield from channel.wait_message()):
  79.             msg = yield from channel.get()
  80.             if msg is None:
  81.                 break
  82.  
  83.             msg = msg.decode('utf-8')
  84.             logging.debug('Got Message: {0}'.format(msg))
  85.             asyncio.async(self._delete_session(msg), loop=self._app.loop)
  86.  
  87.     @asyncio.coroutine
  88.     def _delete_session(self, session_id):
  89.         # TODO: do not delete session if player in battle
  90.         if session_id not in self._cache_dict:
  91.             logging.warning('received expiration of unknown session_id. ignoring')
  92.             return
  93.  
  94.         player = self._cache_dict.pop(session_id)
  95.         self._session_dict.pop(player.id)
  96.  
  97.         yield from self._system_db.unbind_player(player.id)
  98.  
  99.     def _build_channel_name(self):
  100.         return '__keyevent@{db}__:expired'.format(db=self._redis_conf['db'])
  101.  
  102.     @property
  103.     def session_ttl(self):
  104.         return self._session_ttl
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement