Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #coding=utf-8
- import greenify
- greenify.greenify()
- from greenify import spawn
- import logging
- import time
- import datetime
- from collections import deque
- from functools import wraps
- import itertools
- import torndb
- from torndb import Row
- from tornado.gen import coroutine, Return
- assert greenify.patch_lib("/usr/lib64/mysql/libmysqlclient_r.so")
- def enable_debug():
- import inspect
- import greenlet
- import logging
- def trace_green(event, args):
- src, target = args
- if event == "switch":
- logging.info("from %s switch to %s" % (src, target))
- elif event == "throw":
- logging.info("from %s throw exception to %s" % (src, target))
- if src.gr_frame:
- tracebacks = inspect.getouterframes(src.gr_frame)
- buff = []
- for traceback in tracebacks:
- srcfile, lineno, func_name, codesample = traceback[1:-1]
- trace_line = '''File "%s", line %s, in %s\n%s '''
- codesample = "".join(codesample) if codesample else ""
- buff.append(trace_line % (srcfile, lineno, func_name, codesample))
- #logging.info("".join(buff))
- logging.info(buff[0])
- greenlet.settrace(trace_green)
- #enable_debug()
- class ConnectionError(Exception):
- pass
- def cov(row):
- return tuple(isinstance(v, (datetime.datetime, datetime.date)) and str(v) or v for v in row)
- class Connection(torndb.Connection):
- def query(self, query, *parameters, **kwparameters):
- """Returns a row list for the given query and parameters."""
- cursor = self._cursor()
- try:
- self._execute(cursor, query, parameters, kwparameters)
- column_names = [d[0].lower() for d in cursor.description]
- return [Row(itertools.izip(column_names, cov(row))) for row in cursor]
- finally:
- cursor.close()
- def multi_execute(self, full_sql, *args):
- sql_list = full_sql.split(';')
- if sql_list[-1].strip() == '':
- sql_list = sql_list[:-1]
- offset = 0
- cursor = self._cursor()
- cursor.execute("BEGIN")
- try:
- for sql in sql_list:
- arg_count = sql.count('%s')
- cur_args = args[offset: offset + arg_count]
- cursor.execute(sql, cur_args) # 不可用 *cur_args ,torndb 这里只接受两个参数
- offset += arg_count
- cursor.execute("COMMIT")
- except Exception as e:
- cursor.execute("ROLLBACK")
- finally:
- cursor.close()
- class AsyncDBConnection(object):
- def __init__(self, host, port, database, user, password, charset='utf8', time_zone='+8:00', max_connection=20):
- self.__host = host
- self.__port = port
- self.__database = database
- self.__user = user
- self.__password = password
- self.__charset = charset
- self.__time_zone = time_zone
- self.__max_connection = max_connection
- self.__idle_queue = deque()
- self.__busy_queue = deque()
- self.idle_queue = self.__idle_queue
- self.busy_queue = self.__busy_queue
- self.__op_times = 0
- # add by WuLin
- def disconnect_all(self):
- for conn in self.__idle_queue:
- spawn(conn.close)
- for conn in self.__busy_queue:
- spawn(conn.close)
- @coroutine
- def __get_connection(self):
- if self.__idle_queue:
- c = self.__idle_queue.popleft()
- self.__busy_queue.append(c)
- raise Return(c)
- else:
- if len(self.__busy_queue)+len(self.__idle_queue) < self.__max_connection:
- c = yield spawn(self.connect)
- self.__busy_queue.append(c)
- raise Return(c)
- else:
- logging.error("connection exceed max:%s" % self.__max_connection)
- raise ConnectionError("too many connection")
- def connect(self, *args, **kwargs):
- c = Connection(host=self.__host + ':' + str(self.__port), database=self.__database, user=self.__user,
- password=self.__password, time_zone=self.__time_zone, charset=self.__charset)
- return c
- def __shrink(self, c):
- self.__op_times += 1
- idle_len = len(self.__idle_queue)
- busy_len = len(self.__busy_queue)
- flood_gate = idle_len+busy_len+self.__max_connection*self.__max_connection/(idle_len+busy_len+1)+10000
- if self.__op_times >= flood_gate:
- if idle_len >= 1:
- c.close()
- c = None
- return True
- self.__op_times = 0
- return False
- def __recycle(self, c):
- try:
- self.__busy_queue.remove(c)
- if not self.__shrink(c):
- self.__idle_queue.append(c)
- except ValueError:
- logging.error("recycle error, there must be some thing wrong!")
- @coroutine
- def __do_sql(self, method, sql, *args):
- conn = yield self.__get_connection()
- if hasattr(conn, method):
- res = yield spawn(getattr(conn, method), sql, *args)
- else:
- logging.error("__do_sql error, method not exist:%s" % method)
- self.__recycle(conn)
- raise Return(res)
- def __make_sql_operation(name):
- def __(fn):
- @wraps(fn)
- @coroutine
- def __sql_op(self, sql, *args):
- res = yield self.__do_sql(name, sql, *args)
- raise Return(res)
- return __sql_op
- return __
- @__make_sql_operation("query")
- def query(self):
- pass
- @__make_sql_operation("get")
- def get(self):
- pass
- @__make_sql_operation("insert")
- def insert(self):
- pass
- @__make_sql_operation("execute")
- def execute(self):
- pass
- @__make_sql_operation("multi_execute") # add by WuLin
- def multi_execute(self):
- pass
- @__make_sql_operation("execute_lastrowid")
- def execute_lastrowid(self):
- pass
- @__make_sql_operation("execute_rowcount")
- def execute_rowcount(self):
- pass
- @__make_sql_operation("executemany")
- def executemany(self):
- pass
- @__make_sql_operation("executemany_lastrowid")
- def executemany_lastrowid(self):
- pass
- @__make_sql_operation("executemany_rowcount")
- def executemany_rowcount(self):
- pass
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement