Advertisement
Guest User

Untitled

a guest
Dec 2nd, 2016
101
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.50 KB | None | 0 0
  1. from gevent import monkey
  2. monkey.patch_socket()
  3. import logging
  4.  
  5. import gevent
  6. from gevent.queue import Queue
  7. import pymysql as db
  8.  
  9. logging.basicConfig(level=logging.DEBUG)
  10. LOGGER = logging.getLogger("connection_pool")
  11.  
  12.  
  13. class ConnectionPool:
  14. def __init__(self, db_config, time_to_sleep=30, test_run=False):
  15. self.username = db_config.get('user')
  16. self.password = db_config.get('password')
  17. self.host = db_config.get('host')
  18. self.port = int(db_config.get('port'))
  19. self.max_pool_size = 20
  20. self.test_run = test_run
  21. self.pool = None
  22. self.time_to_sleep = time_to_sleep
  23. self._initialize_pool()
  24.  
  25. def get_initialized_connection_pool(self):
  26. return self.pool
  27.  
  28. def _initialize_pool(self):
  29. self.pool = Queue(maxsize=self.max_pool_size)
  30. current_pool_size = self.pool.qsize()
  31. if current_pool_size < self.max_pool_size: # this is a redundant check, can be removed
  32. for _ in xrange(0, self.max_pool_size - current_pool_size):
  33. try:
  34. conn = db.connect(host=self.host,
  35. user=self.username,
  36. passwd=self.password,
  37. port=self.port)
  38. self.pool.put_nowait(conn)
  39.  
  40. except db.OperationalError, e:
  41. LOGGER.error("Cannot initialize connection pool - retrying in {} seconds".format(self.time_to_sleep))
  42. LOGGER.exception(e)
  43. break
  44. self._check_for_connection_loss()
  45.  
  46. def _re_initialize_pool(self):
  47. gevent.sleep(self.time_to_sleep)
  48. self._initialize_pool()
  49.  
  50. def _check_for_connection_loss(self):
  51. while True:
  52. conn = None
  53. if self.pool.qsize() > 0:
  54. conn = self.pool.get()
  55.  
  56. if not self._ping(conn):
  57. if self.test_run:
  58. self.port = 3306
  59.  
  60. self._re_initialize_pool()
  61.  
  62. else:
  63. self.pool.put_nowait(conn)
  64.  
  65. if self.test_run:
  66. break
  67. gevent.sleep(self.time_to_sleep)
  68.  
  69. def _ping(self, conn):
  70. try:
  71. if conn is None:
  72. conn = db.connect(host=self.host,
  73. user=self.username,
  74. passwd=self.password,
  75. port=self.port)
  76. cursor = conn.cursor()
  77. cursor.execute('select 1;')
  78. LOGGER.debug(cursor.fetchall())
  79. return True
  80.  
  81. except db.OperationalError, e:
  82. LOGGER.warn('Cannot connect to mysql - retrying in {} seconds'.format(self.time_to_sleep))
  83. LOGGER.exception(e)
  84. return False
  85.  
  86. # test (pytest compatible) -------------------------------------------------------------------------------------------
  87. import logging
  88.  
  89. from src.py.ConnectionPool import ConnectionPool
  90.  
  91. logging.basicConfig(level=logging.DEBUG)
  92. LOGGER = logging.getLogger("test_connection_pool")
  93.  
  94.  
  95. def test_get_initialized_connection_pool():
  96. config = {
  97. 'user': 'root',
  98. 'password': '',
  99. 'host': '127.0.0.1',
  100. 'port': 3305
  101. }
  102. conn_pool = ConnectionPool(config, time_to_sleep=5, test_run=True)
  103. pool = conn_pool.get_initialized_connection_pool()
  104. # when in test run the port will be switched back to 3306
  105. # so the queue size should be 20 - will be nice to work
  106. # around this rather than test_run hack
  107. assert pool.qsize() == 20
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement