Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pymysql.cursors
- import configparser
- #连接配置信息示例
- config = {
- 'host':'localhost',
- 'port':3306,
- 'user':'root',
- 'password':'123',
- 'db':'nsbd',
- 'charset':'utf8mb4',
- 'cursorclass':pymysql.cursors.DictCursor,
- }
- class Sql:
- '''Conn模块中的Sql类是搭配DataSource类使用的,此类做连接实例对象'''
- def __init__(self, conf = None, conn_pool = None):
- '''conf是接收从DataSource传递过来的配置参数,conn_pool是接收传递过来的连接池'''
- if not isinstance(conn_pool, DataSource):
- raise Exception('请配置连接池给连接实例')
- else:
- self.conn_list = conn_pool.conn_list
- self.config = conf
- self.connection = pymysql.connect(**self.config)
- def close(self):
- '''打开一个连接后的关闭连接操作'''
- self.conn_list.append(self)#把操作完成的连接实例在追加到连接池的末尾
- def __createPybean(self, class_path):
- path_list = class_path.split('.')
- module_name = path_list[-2] # 模块名
- class_name = path_list[-1] # 类名
- p_m_name = '.'.join(path_list[:-1]) # 包名并模块名
- module = __import__(p_m_name, fromlist=module_name)
- return getattr(module, class_name)()
- def __reflectToBean(self, data_dict, class_path):
- '''data_dict是数据字典,class_path是要映射到类名的路径'''
- bean = self.__createPybean(class_path)#动态生成该类
- bean_attr = bean.__dict__#该类的bean属性
- for key in list(data_dict.keys()):
- if ('_' + class_path.split('.')[-1] + '__' + key in bean_attr.keys()):
- getattr(bean, 'set' + key.capitalize())(data_dict[key])#获取bean的set方法
- else:
- raise Exception('属性映射失败,请确保取得的数据库表中列名与pybean的属性名一致')
- return bean
- def query(self, sql, argvs = None, class_path = None):
- '''查询'''
- global result
- try:
- with self.connection.cursor() as cursor:
- if argvs==0:
- cursor.execute(sql)
- else:
- cursor.execute(sql, argvs)
- result = cursor.fetchall()
- if(class_path!=None):
- list = result
- result = []
- for item in list:
- result.append(self.__reflectToBean(item, class_path))#进行对象映射
- self.connection.commit()
- finally:
- return result
- def insert(self, sql, argvs = None):
- try:
- with self.connection.cursor() as cursor:
- if len(argvs) == 0:
- cursor.execute(sql)
- else:
- cursor.execute(sql, argvs)
- self.connection.commit()
- except:
- self.connection.rollback()
- def update(self, sql, argvs = None):
- self.insert(sql, argvs)
- def delete(self, sql, argvs = None):
- self.insert(sql, argvs)
- def __del__(self):
- self.connection.close()
- class DataSource:
- def __init__(self, conf, max_conn_counts = 5):
- '''max_conn_count是最大连接数,conf是连接数据库的基本配置,它可以是一个字典类型也可以是一个配置文件名,
- 若是配置文件名,则配置文件中的session名必须为‘[db]’详细参考ConfigParser模块的使用'''
- print('正在初始化连接池...')
- if isinstance(conf, dict):
- self.conf = conf
- elif isinstance(conf, str):
- print('正在读取DB配置文件...')
- self.conf = {}
- self.__readConfFile(conf)
- else:
- raise Exception('conf参数必须为dict或str类型')
- if not isinstance(max_conn_counts, int):
- raise Exception("最大连接数必须为int型")
- elif max_conn_counts not in range(1,11):#连接数介于1-10之间
- raise Exception("最大连接数必须介于1-10之间")
- else:
- print('连接池初始化连接开始...')
- #连接池的存储连接列表
- self.conn_list = []
- for i in range(0, max_conn_counts):
- self.conn_list.append(Sql(conf = self.conf, conn_pool = self))
- print('连接池初始化连接结束...')
- def __readConfFile(self, filePath):
- cp = configparser.ConfigParser()
- cp.read(filePath, encoding='utf-8')#编码设置为utf-8
- self.conf['host'] = cp.get('db', 'host')
- self.conf['port'] = cp.getint('db', 'port')
- self.conf['user'] = cp.get('db', 'user')
- self.conf['password'] = cp.get('db', 'password')
- self.conf['db'] = cp.get('db', 'db')
- self.conf['charset'] = 'utf8mb4' #默认编码UTF-8
- self.conf['cursorclass'] = pymysql.cursors.DictCursor #默认游标
- def getConnect(self):
- if len(self.conn_list) == 0:
- raise Exception('连接池中目前没有可用的连接,请稍后再试')
- else:
- return self.conn_list.pop(0)#从连接池中取出最前面的一个连接实例
- def deleteAll(self):
- '''在程序运行时关闭连接池'''
- print('正在注销连接池')
- for conn in self.conn_list:#在删除连接池的时候调用析构函数并调用连接实例的析构函数来关闭所有连接
- conn.__del__()
- self.__del__()
- print('连接池注销完成')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement