Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- import aioredis
- import asyncio
- import logging
- class SessionWatcher:
- def __init__(self, app, redis_conf, session_ttl, system_db, cache_dict, session_dict):
- self._app = app
- self._session_ttl = session_ttl
- self._cache_dict = cache_dict
- self._session_dict = session_dict
- self._system_db = system_db
- self._redis_conf = redis_conf
- self._redis_pool = None
- self._listen_connection = None
- @asyncio.coroutine
- def connect(self):
- address = (self._redis_conf['host'], self._redis_conf['port'])
- self._redis_pool = yield from aioredis.create_pool(
- address,
- db=self._redis_conf['db'],
- password=self._redis_conf.get('password'),
- minsize=self._redis_conf['minsize'],
- maxsize=self._redis_conf['maxsize'],
- encoding=self._redis_conf.get('encoding', 'utf-8'),
- loop=self._app.loop
- )
- self._listen_connection = yield from self._redis_pool.acquire()
- with (yield from self._redis_pool) as redis_connection: # type: aioredis.Redis
- res = yield from redis_connection.ping()
- logging.info("SessionWatcher redis ping answer is {}".format(res))
- # TODO: right now we set config parameters for pubsub notifications
- # TODO: but if we are sure that at deployment parameters will be others
- # TODO: it is necessary to fix this code
- # for more details see here: http://redis.io/topics/notifications
- yield from redis_connection.config_set('notify-keyspace-events', 'Ex')
- @asyncio.coroutine
- def stop(self):
- if self._listen_connection:
- yield from self._listen_connection.unsubscribe(self._build_channel_name())
- self._redis_pool.release(self._listen_connection)
- self._listen_connection = None
- if self._cache_dict:
- with (yield from self._redis_pool) as redis_connection: # type: aioredis.Redis
- yield from redis_connection.delete(*self._cache_dict.keys())
- yield from self._redis_pool.clear()
- self._app = None
- @asyncio.coroutine
- def start_watch(self):
- # the following two lines are needed when service was interrupted abnormally - flush invalid sessions
- with (yield from self._redis_pool) as redis_connection: # type: aioredis.Redis
- yield from redis_connection.flushdb()
- channel, = yield from self._listen_connection.subscribe(self._build_channel_name())
- asyncio.async(self._listen_channel(channel), loop=self._app.loop)
- @asyncio.coroutine
- def prolong_session(self, session_id, ttl=None):
- with (yield from self._redis_pool) as redis_connection: # type: aioredis.Redis
- if ttl is None:
- ttl = self._session_ttl
- yield from redis_connection.set(session_id, 0, expire=ttl)
- @asyncio.coroutine
- def _listen_channel(self, channel):
- while (yield from channel.wait_message()):
- msg = yield from channel.get()
- if msg is None:
- break
- msg = msg.decode('utf-8')
- logging.debug('Got Message: {0}'.format(msg))
- asyncio.async(self._delete_session(msg), loop=self._app.loop)
- @asyncio.coroutine
- def _delete_session(self, session_id):
- # TODO: do not delete session if player in battle
- if session_id not in self._cache_dict:
- logging.warning('received expiration of unknown session_id. ignoring')
- return
- player = self._cache_dict.pop(session_id)
- self._session_dict.pop(player.id)
- yield from self._system_db.unbind_player(player.id)
- def _build_channel_name(self):
- return '__keyevent@{db}__:expired'.format(db=self._redis_conf['db'])
- @property
- def session_ttl(self):
- return self._session_ttl
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement