Advertisement
Guest User

Untitled

a guest
Sep 17th, 2019
109
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.37 KB | None | 0 0
  1. import logging
  2. import psycopg2
  3. from psycopg2.extras import RealDictCursor
  4. from psycopg2.extensions import TRANSACTION_STATUS_UNKNOWN, TRANSACTION_STATUS_IDLE
  5. from flask import g
  6. import threading
  7. import tenacity
  8. import uuid
  9. import pwd
  10. import os
  11.  
  12.  
  13. # we want to set up a separate logger
  14. logger = logging.getLogger(__name__)
  15.  
  16.  
  17. class PoolError(psycopg2.Error):
  18. pass
  19.  
  20.  
  21. class ConnectionPool:
  22. def __init__(self, minconn, maxconn, *args, **kwargs):
  23. self.minconn = int(minconn)
  24. self.maxconn = int(maxconn)
  25.  
  26. self._args = args
  27. self._kwargs = kwargs
  28.  
  29. self._pool = [] # connections that are available
  30. self._used = {} # connections currently in use
  31.  
  32. # control access to the thread pool
  33. self._lock = threading.RLock()
  34.  
  35. def getconn(self, key):
  36. with self._lock:
  37. # this key already has a connection so return it
  38. if (key in self._used):
  39. return self._used[key]
  40.  
  41. # our pool is currently empty
  42. if (len(self._pool) == 0):
  43. # we've given out all of the connections that we want to
  44. if (len(self._used) == self.maxconn):
  45. raise PoolError("connection pool exhausted")
  46.  
  47. # get a connection but do it with a retry
  48. conn = self._connect()
  49.  
  50. # add to the list of available connections
  51. self._pool.append(conn)
  52.  
  53. # take a connection out of the pool and give it away
  54. self._used[key] = conn = self._pool.pop()
  55. return conn
  56.  
  57. def putconn(self, key, close=False):
  58. with self._lock:
  59. conn = self.getconn(key)
  60. if (conn is None):
  61. raise PoolError("no connection with that key")
  62.  
  63. if (len(self._pool) < self.minconn and not close):
  64. # Return the connection into a consistent state before putting
  65. # it back into the pool
  66. status = conn.info.transaction_status
  67. if (status == TRANSACTION_STATUS_UNKNOWN):
  68. # server connection lost
  69. conn.close()
  70. elif (status != TRANSACTION_STATUS_IDLE):
  71. # connection in error or in transaction
  72. conn.rollback()
  73. self._pool.append(conn)
  74. else:
  75. # regular idle connection
  76. self._pool.append(conn)
  77. else:
  78. conn.close()
  79.  
  80. # here we check for the presence of key because it can happen that
  81. # a thread tries to put back a connection after a call to close
  82. if (key in self._used):
  83. del self._used[key]
  84.  
  85. # retry with a random value between every 0.5 and 1.5 seconds
  86. @tenacity.retry(wait=tenacity.wait_fixed(0.5) + tenacity.wait_random(0, 1.5), before=tenacity.before_log(logger, logging.DEBUG))
  87. def _connect(self):
  88. # connect to the database with the arguments provided when the pool was
  89. # initialized. enable autocommit for consistency. this will retry using
  90. # the "tenacity" library.
  91. conn = psycopg2.connect(*self._args, **self._kwargs)
  92. conn.autocommit = True
  93. return conn
  94.  
  95.  
  96. class DatabaseClient:
  97. def __init__(self, app=None, **kwargs):
  98. if (app is not None):
  99. self.init_app(app, **kwargs)
  100. else:
  101. self.app = None
  102.  
  103. def init_app(self, app, key="default", minconn=2, maxconn=32, **kwargs):
  104. """
  105. The key is a name for the connection. This allows you to build pools
  106. for multiple databases. If you don't provide one then you can only
  107. pool one database.
  108. """
  109. self.app = app
  110.  
  111. # this is how we will find the database connection client identifier
  112. # for this request. this lets the library ensure that it is handing out
  113. # the same connection for the duration of the request.
  114. self.key = "db_client_key[{}]".format(key)
  115.  
  116. # initialize the connection pool
  117. self.pool = ConnectionPool(
  118. minconn=minconn,
  119. maxconn=maxconn,
  120. cursor_factory=RealDictCursor,
  121. **kwargs,
  122. )
  123.  
  124. # this will clean up the connection when it is done
  125. self.app.teardown_request(self.close)
  126.  
  127. def conn(self):
  128. """
  129. This function should be used by your Flask views to get a connection
  130. to the database. It will always return a valid connection and will
  131. always return the same connection to the same request. It will only
  132. throw an exception if the pool is full.
  133. """
  134. # loop until we have a database connection
  135. db_client = None
  136. while (db_client is None):
  137. # see if we have a database client identifier for this request
  138. # already. if we have a client identifier then get the connection
  139. # associated with that identifier and test if it is still alive. if
  140. # it is alive then return it. if it is not alive then raise an
  141. # exception because we want to return the same connection through
  142. # an entire request. if we do NOT have a client identifier then
  143. # get a connection and test it until we get a connection that is
  144. # alive.
  145. db_client_id = None
  146. if (hasattr(g, self.key)):
  147. # try to get a connection with this client id
  148. db_client_id = str(getattr(g, self.key))
  149. db_client = self._get_connection(db_client_id)
  150.  
  151. # no connection returned for the request's client identifier so
  152. # the connection is dead and we can't do anything.
  153. if (db_client is None):
  154. delattr(g, self.key) # remove client identifier
  155. raise PoolError("request connection lost")
  156.  
  157. # actually the client identifier returned a valid connection
  158. return db_client
  159.  
  160. # try to get a connection with a new identifier
  161. db_client_id = str(uuid.uuid4())
  162. db_client = self._get_connection(db_client_id)
  163.  
  164. # the connection that we got was valid so let's save the identifier
  165. # and return the connection. (if it wasn't valid then we'll just
  166. # repeate the loop which is a-ok.)
  167. if (db_client is not None):
  168. # do anything with a new connection here. for example, maybe
  169. # you want to set a configuration value that use the person's
  170. # username in it. i don't know.
  171. # TODO
  172.  
  173. # then attach the connection to the request global
  174. setattr(g, self.key, db_client_id)
  175. return db_client
  176.  
  177. def close(self, exception):
  178. # this gets called when a request is finished, regardless of the state
  179. # of the request (e.g. success [2xx] or failure [4xx, 5xx])
  180. if (hasattr(g, self.key)):
  181. try:
  182. db_client_id = getattr(g, self.key)
  183. self.pool.putconn(db_client_id)
  184. logger.debug("returned connection {} to pool named {}".format(db_client_id, self.key))
  185. except (PoolError, KeyError) as e:
  186. logger.error("could not return connection to pool: {}".format(repr(e)))
  187.  
  188. def _get_connection(self, db_client_id):
  189. db_client = self.pool.getconn(db_client_id)
  190.  
  191. try:
  192. logger.debug("testing connection {} from pool named {}".format(db_client_id, self.key))
  193.  
  194. # test the connection before giving it back to ensure it works.
  195. # if it doesn't work then we're going to close it and try to
  196. # get a different connection until we find one that works.
  197. cur = db_client.cursor()
  198. cur.execute("SELECT pg_backend_pid()")
  199. cur.close()
  200. except Exception as e:
  201. logger.warning("connection {} from pool named {} failed: {}".format(db_client_id, self.key, e))
  202.  
  203. # we do not have a valid connection so put it back and close it
  204. # and set our current db_client to None so that our next time
  205. # around the loop will attempt to get a new connection.
  206. self.pool.putconn(db_client_id, close=True)
  207.  
  208. # the connection was bad
  209. return
  210. else:
  211. logger.debug("using connection {} from pool named {}".format(db_client_id, self.key))
  212.  
  213. # the connection was good
  214. return db_client
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement