目录:
- 每篇前言:
- DBUtils库
- 模式一(底层使用threading.local实现):
- 模式二:
- Flask中使用
- 方式一:直接将DBUtils初始化放到settings.py文件中
- 方式二:从utils文件夹中导入
- 脚本使用DBUtils代码demo:
每篇前言:
🏆🏆作者介绍:【孤寒者】—CSDN全栈领域优质创作者、HDZ核心组成员、华为云享专家Python全栈领域博主、CSDN原力计划作者
- 🔥🔥本文已收录于Flask框架从入门到实战专栏:《Flask框架从入门到实战》
- 🔥🔥热门专栏推荐:《Python全栈系列教程》、《爬虫从入门到精通系列教程》、《爬虫进阶+实战系列教程》、《Scrapy框架从入门到实战》、《Flask框架从入门到实战》、《Django框架从入门到实战》、《Tornado框架从入门到实战》、《前端系列教程》。
- 📝📝本专栏面向广大程序猿,为的是大家都做到Python全栈技术从入门到精通,穿插有很多实战优化点。
- 🎉🎉订阅专栏后可私聊进一千多人Python全栈交流群(手把手教学,问题解答); 进群可领取Python全栈教程视频 + 多得数不过来的计算机书籍:基础、Web、爬虫、数据分析、可视化、机器学习、深度学习、人工智能、算法、面试题等。
- 🚀🚀加入我一起学习进步,一个人可以走的很快,一群人才能走的更远!
引子:
在上一个demo项目中,登录部分验证是直接写死的,本文模拟实际生产,查询MySQL数据库做验证。
一个很low的方法是:
项目根目录下创建utils/sql.py:
import pymysqlclass SQLHelper(object):@staticmethoddef open():conn = pymysql.connect(host='127.0.0.1', port=3306, password='123456', db='UserInfo')cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)return conn, cursor@staticmethoddef close(conn, cursor):conn.commit()cursor.close()conn.close()@classmethoddef fetch_one(cls, sql, args):conn, cursor = cls.open()cursor.execute(sql, args)obj = cursor.fetchone()cls.close(conn, cursor)return obj
上面这种用法很严重且明显的一个问题是: 每次登录一次都要和数据库创建一个连接!
解决方法就是使用DBUtils三方库:
DBUtils库
pip install DBUtils==1.3
DBUtils 是一套用于管理数据库连接池的Python包,为高频度高并发的数据库访问提供更好的性能,可以自动管理连接对象的创建和释放。并允许对非线程安全的数据库接口进行线程安全包装。
这种连接池有两种连接模式:
-
PersistentDB:提供线程专用的数据库连接,并自动管理连接。
为每个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新放到连接池,供自己线程再次使用。当线程终止时,连接自动关闭!
-
PooledDB:提供线程间可共享的数据库连接,并自动管理连接。
创建一批连接到连接池,供所有线程共享使用。
PS:由于pymysql的threadsafety值为1,而DBUtils库用的内部又是用的pymysql,所以该模式连接池中的线程会被所有线程共享。
实测证明 PersistentDB 的速度是最高的(即第一种模式),但是在某些特殊情况下,数据库的连接过程可能异常缓慢,而此时的PooledDB(即模式二,所以推荐这个)则可以提供相对来说平均连接时间比较短的管理方式。
模式一(底层使用threading.local实现):
- 了解即可~~~
from DBUtils.PersistentDB import PersistentDB
import pymysqlPOOL = PersistentDB(creator=pymysql, # 连接数据库的模块maxusage=None, # 一个数据库连接最多被重复使用的次数,None表示无限制setsession=[], # 开始会话前执行的命令列表。如["set time zone ..."]ping=0, # ping MySQL客户端,检查服务是否可用。 【一般用0,4,7】# 0 = None = never; 1 = default = whenever if is requested; 2 = when a cursor is created; 4 = when a query is executed; 7 = alwayscloseable=False, # False:conn.close()实际上被忽略,供下次使用,在线程关闭时,才会自动关闭连接;True:conn.close()则关闭连接,再次调用就是一个新的连接了threadlocal=None, # 本线程独享值的对象,用于保存链接对象,如果链接对象被重置,也清除host='127.0.0.1',port=6379,user='root',password='123456',database='UserInfo',charset='utf8'
)
使用:
def demo():conn = POOL.connection(shareable=False)cursor = conn.cursor()cursor.execute('select * from users')result = cursor.fetchall()cursor.close() conn.close()
模式二:
- 用这个~~~
from DBUtils.PooledDB import PooledDB
import pymysqlPOOL = PooledDB(creator=pymysql, # 连接数据库的模块maxconnections=6, # 连接池允许的最大连接数, 0和None表示不限制mincached=2, # 初始化时,连接池中至少创建的空闲的连接,0表示不创建maxcached=5, # 连接池中最多闲置的连接,0和None不限制maxshared=3, # 连接池中最多共享的连接数量,0和None表示全部共享【默认为0,而且哪怕设置别的值也无用!!!下面会将为啥】blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True:等待;False:不等待直接报错maxusage=None, # 一个连接最多被重复使用的次数,None表示无限制setsession=[], # 开始会话前执行的命令列表。如["set time zone ..."]ping=0, # ping MySQL客户端,检查服务是否可用。 【一般用0,4,7】# 0 = None = never; 1 = default = whenever if is requested; 2 = when a cursor is created; 4 = when a query is executed; 7 = alwayshost='127.0.0.1',port=6379,user='root',password='123456',database='UserInfo',charset='utf8'
)
-
为什么设置maxshared参数的值是无用的!因为DBUtils使用的pymysql,而pymysql内部threadsafety值为1。看源码:
import pymysql
DBUtils源码:
from DBUtils.PooledDB import PooledDB
使用:
def demo():"""检测当前正在运行连接数是否小于最大连接数;如果不小于则等待或报错:raise TooManyConnection异常否则则优先去初始化时创建的连接中获取连接:SteadyDBConnection然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。如果最开始创建的连接没有连接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。一旦关闭连接后,连接就返回到连接池让后续线程继续使用~"""conn = POOL.connection()cursor = conn.cursor()cursor.execute('select * from users')result = cursor.fetchall()conn.close()
Flask中使用
方式一:直接将DBUtils初始化放到settings.py文件中
方式二:从utils文件夹中导入
脚本使用DBUtils代码demo:
# coding=utf-8
"""
使用DBUtils数据库连接池中的连接,操作数据库
"""
import datetime
import pymysql
from DBUtils.PooledDB import PooledDBclass MysqlClient(object):def __init__(self, **kwargs):self.pool = self.create_pool(**kwargs)self.connection = Noneself.cursor = Nonedef create_pool(self, **kwargs):return PooledDB(pymysql,mincached=kwargs.get('mincached', 10),maxcached=kwargs.get('maxcached', 20),maxshared=kwargs.get('maxshared', 10),maxconnections=kwargs.get('maxconnections', 200),blocking=kwargs.get('blocking', True),maxusage=kwargs.get('maxusage', 100),setsession=kwargs.get('setsession', None),reset=kwargs.get('reset', True),host=kwargs.get('host', '127.0.0.1'),port=kwargs.get('port', 3306),db=kwargs.get('db', 'mysqldemo'),user=kwargs.get('user', 'root'),passwd=kwargs.get('passwd', '123456'),charset=kwargs.get('charset', 'utf8mb4'),cursorclass=pymysql.cursors.DictCursor)def get_conn_cursor(self):self.connection = self.pool.connection()self.cursor = self.connection.cursor()def close(self):try:if self.cursor:self.cursor.close()if self.connection:self.connection.close()except Exception as e:print(e)def execute(self, sql, param=()):try:self.get_conn_cursor()count = self.cursor.execute(sql, param)print(count)return countfinally:self.close()def __dict_datetime_obj_to_str(self, result_dict):"""把字典里面的datetime对象转成字符串,使json转换不出错"""if result_dict:return {k: v.__str__() if isinstance(v, datetime.datetime) else v for k, v in result_dict.items()}return result_dictdef select_one(self, sql, param=()):"""查询单个结果"""try:self.get_conn_cursor()count = self.execute(sql, param)result = self.cursor.fetchone()result = self.__dict_datetime_obj_to_str(result)return count, resultfinally:self.close()def select_many(self, sql, param=()):"""查询多个结果"""try:self.get_conn_cursor()count = self.execute(sql, param)result = self.cursor.fetchall()result = [self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]return count, resultfinally:self.close()def begin(self):"""开启事务"""try:self.get_conn_cursor()self.connection.autocommit(False)except Exception as e:print(e)def end(self, option='commit'):"""结束事务"""try:if option == 'commit':self.connection.commit()else:self.connection.rollback()except Exception as e:print(e)finally:self.connection.autocommit(True)if __name__ == "__main__":mc = MysqlClient()sql1 = 'SELECT * FROM customers WHERE customerNumber = 103'result1 = mc.select_one(sql1)print(result1[1])sql2 = 'SELECT * FROM customers WHERE customerNumber IN (%s,%s,%s)'param = (103, 144, 145)print(mc.select_many(sql2, param)[1])