Advertisement
Guest User

Untitled

a guest
Mar 24th, 2016
88
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.30 KB | None | 0 0
  1. #coding=utf-8
  2.  
  3. import greenify
  4. greenify.greenify()
  5. from greenify import spawn
  6.  
  7. import logging
  8. import time
  9. import datetime
  10. from collections import deque
  11. from functools import wraps
  12. import itertools
  13. import torndb
  14. from torndb import Row
  15. from tornado.gen import coroutine, Return
  16.  
  17. assert greenify.patch_lib("/usr/lib64/mysql/libmysqlclient_r.so")
  18.  
  19.  
  20. def enable_debug():
  21. import inspect
  22. import greenlet
  23. import logging
  24.  
  25. def trace_green(event, args):
  26. src, target = args
  27. if event == "switch":
  28. logging.info("from %s switch to %s" % (src, target))
  29. elif event == "throw":
  30. logging.info("from %s throw exception to %s" % (src, target))
  31. if src.gr_frame:
  32. tracebacks = inspect.getouterframes(src.gr_frame)
  33. buff = []
  34. for traceback in tracebacks:
  35. srcfile, lineno, func_name, codesample = traceback[1:-1]
  36. trace_line = '''File "%s", line %s, in %s\n%s '''
  37. codesample = "".join(codesample) if codesample else ""
  38. buff.append(trace_line % (srcfile, lineno, func_name, codesample))
  39.  
  40. #logging.info("".join(buff))
  41. logging.info(buff[0])
  42.  
  43. greenlet.settrace(trace_green)
  44. #enable_debug()
  45.  
  46.  
  47. class ConnectionError(Exception):
  48. pass
  49.  
  50.  
  51. def cov(row):
  52. return tuple(isinstance(v, (datetime.datetime, datetime.date)) and str(v) or v for v in row)
  53.  
  54.  
  55. class Connection(torndb.Connection):
  56. def query(self, query, *parameters, **kwparameters):
  57. """Returns a row list for the given query and parameters."""
  58. cursor = self._cursor()
  59. try:
  60. self._execute(cursor, query, parameters, kwparameters)
  61. column_names = [d[0].lower() for d in cursor.description]
  62. return [Row(itertools.izip(column_names, cov(row))) for row in cursor]
  63. finally:
  64. cursor.close()
  65.  
  66. def multi_execute(self, full_sql, *args):
  67. sql_list = full_sql.split(';')
  68. if sql_list[-1].strip() == '':
  69. sql_list = sql_list[:-1]
  70. offset = 0
  71. cursor = self._cursor()
  72. cursor.execute("BEGIN")
  73. try:
  74. for sql in sql_list:
  75. arg_count = sql.count('%s')
  76. cur_args = args[offset: offset + arg_count]
  77. cursor.execute(sql, cur_args) # 不可用 *cur_args ,torndb 这里只接受两个参数
  78. offset += arg_count
  79. cursor.execute("COMMIT")
  80. except Exception as e:
  81. cursor.execute("ROLLBACK")
  82. finally:
  83. cursor.close()
  84.  
  85.  
  86. class AsyncDBConnection(object):
  87. def __init__(self, host, port, database, user, password, charset='utf8', time_zone='+8:00', max_connection=20):
  88. self.__host = host
  89. self.__port = port
  90. self.__database = database
  91. self.__user = user
  92. self.__password = password
  93. self.__charset = charset
  94. self.__time_zone = time_zone
  95. self.__max_connection = max_connection
  96.  
  97. self.__idle_queue = deque()
  98. self.__busy_queue = deque()
  99. self.idle_queue = self.__idle_queue
  100. self.busy_queue = self.__busy_queue
  101.  
  102. self.__op_times = 0
  103.  
  104. # add by WuLin
  105. def disconnect_all(self):
  106. for conn in self.__idle_queue:
  107. spawn(conn.close)
  108. for conn in self.__busy_queue:
  109. spawn(conn.close)
  110.  
  111. @coroutine
  112. def __get_connection(self):
  113. if self.__idle_queue:
  114. c = self.__idle_queue.popleft()
  115. self.__busy_queue.append(c)
  116. raise Return(c)
  117. else:
  118. if len(self.__busy_queue)+len(self.__idle_queue) < self.__max_connection:
  119. c = yield spawn(self.connect)
  120. self.__busy_queue.append(c)
  121. raise Return(c)
  122. else:
  123. logging.error("connection exceed max:%s" % self.__max_connection)
  124. raise ConnectionError("too many connection")
  125.  
  126. def connect(self, *args, **kwargs):
  127. c = Connection(host=self.__host + ':' + str(self.__port), database=self.__database, user=self.__user,
  128. password=self.__password, time_zone=self.__time_zone, charset=self.__charset)
  129. return c
  130.  
  131. def __shrink(self, c):
  132. self.__op_times += 1
  133. idle_len = len(self.__idle_queue)
  134. busy_len = len(self.__busy_queue)
  135. flood_gate = idle_len+busy_len+self.__max_connection*self.__max_connection/(idle_len+busy_len+1)+10000
  136. if self.__op_times >= flood_gate:
  137. if idle_len >= 1:
  138. c.close()
  139. c = None
  140. return True
  141. self.__op_times = 0
  142. return False
  143.  
  144. def __recycle(self, c):
  145. try:
  146. self.__busy_queue.remove(c)
  147. if not self.__shrink(c):
  148. self.__idle_queue.append(c)
  149. except ValueError:
  150. logging.error("recycle error, there must be some thing wrong!")
  151.  
  152. @coroutine
  153. def __do_sql(self, method, sql, *args):
  154. conn = yield self.__get_connection()
  155. if hasattr(conn, method):
  156. res = yield spawn(getattr(conn, method), sql, *args)
  157. else:
  158. logging.error("__do_sql error, method not exist:%s" % method)
  159. self.__recycle(conn)
  160. raise Return(res)
  161.  
  162. def __make_sql_operation(name):
  163. def __(fn):
  164. @wraps(fn)
  165. @coroutine
  166. def __sql_op(self, sql, *args):
  167. res = yield self.__do_sql(name, sql, *args)
  168. raise Return(res)
  169. return __sql_op
  170. return __
  171.  
  172. @__make_sql_operation("query")
  173. def query(self):
  174. pass
  175.  
  176. @__make_sql_operation("get")
  177. def get(self):
  178. pass
  179.  
  180. @__make_sql_operation("insert")
  181. def insert(self):
  182. pass
  183.  
  184. @__make_sql_operation("execute")
  185. def execute(self):
  186. pass
  187.  
  188. @__make_sql_operation("multi_execute") # add by WuLin
  189. def multi_execute(self):
  190. pass
  191.  
  192. @__make_sql_operation("execute_lastrowid")
  193. def execute_lastrowid(self):
  194. pass
  195.  
  196. @__make_sql_operation("execute_rowcount")
  197. def execute_rowcount(self):
  198. pass
  199.  
  200. @__make_sql_operation("executemany")
  201. def executemany(self):
  202. pass
  203.  
  204. @__make_sql_operation("executemany_lastrowid")
  205. def executemany_lastrowid(self):
  206. pass
  207.  
  208. @__make_sql_operation("executemany_rowcount")
  209. def executemany_rowcount(self):
  210. pass
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement