提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 前言
- 一、mysql连接池?
- 二、使用步骤
- 1.引入库
前言
提示:这里可以添加本文要记录的大概内容:
例如:
提示:以下是本篇文章正文内容,下面案例可供参考
一、mysql连接池?
安装包 DBUtils
pip install DBUtils==1.3
二、使用步骤
1.引入库
代码如下(示例):
# -*- coding:utf-8 -*-
# author: cai bao jun
# datetime: 2024/3/1 11:38
# @File: 4数据库操作2.pyimport pymysql
from DBUtils.PooledDB import PooledDB
import datetimefrom logger import logger#### DBUtils 1.3
#### DBUtils 1.3
#### DBUtils 1.3class MysqlConfig(object):database = "test2022" # 测试 trainerNhost = "127.0.0.1"user = "root"port = 3306password = "root"# Mysql数据库相关操作
# @Singleton
class DMLMysql(object):_pool = None_isinstance = None_flag = Truedef __new__(cls, *args, **kwargs):if not cls._isinstance:print('new')cls._pool = PooledDB(creator=pymysql, # 使用链接数据库的模块mincached=10, # 初始化时,链接池中至少创建的链接,0表示不创建maxconnections=200, # 连接池允许的最大连接数,0和None表示不限制连接数blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错host=MysqlConfig.host,port=MysqlConfig.port,user=MysqlConfig.user,password=MysqlConfig.password,database=MysqlConfig.database,)cls._isinstance = super().__new__(cls)return cls._isinstancedef __init__(self, host=MysqlConfig.host, database=MysqlConfig.database, user=MysqlConfig.user, password=MysqlConfig.password, port=MysqlConfig.port):try:# print('开始链接mysql22332')self.database = databaseself.pool = DMLMysql._poolexcept Exception as e:logger.error(f"database connect error message is {str(e)}")passpassdef open(self):self.conn = self.pool.connection()self.cursor = self.conn.cursor() # 表示读取的数据为字典类型return self.conn, self.cursordef close(self, cursor, conn):cursor.close()conn.close()def execute_sql(self, sqlQuery, value):""":param sqlQuery: 拼接好的sql语句:param value: 需要拼接的值:return:"""try:conn, cursor = self.open()conn.ping(reconnect=True) # 超时断开重连cursor.execute(sqlQuery, value)# logger.info('数据执行成功!')except Exception as e:logger.error(f"database name is {self.database} error info is:{str(e)},sql is : {sqlQuery}")conn.rollback()else:conn.commit()finally:self.close(cursor, conn)def select_sql(self,sqlQuery, value):ret = Nonetry:conn, cursor = self.open()conn.ping(reconnect=True) # 超时断开重连cursor.execute(sqlQuery, value)ret = cursor.fetchall()# logger.info('查询数据执行成功!')except Exception as e:logger.error(f"database name is {self.database} error info is:{str(e)},sql is : {sqlQuery}")# self.conn.rollback()finally:self.close(cursor, conn)return retdef __del__(self):# self.cursor.close()# self.conn.close()# print('关闭mysql22332')passif __name__ == '__main__':dml = DMLMysql()select_sql = 'select author_id,category_id,views from article where id=%s'value = (1,)ret1 = dml.select_sql(sqlQuery=select_sql,value=value)print(ret1)dml1 = DMLMysql()dml2 = DMLMysql()print(id(dml1))print(id(dml2))print(id(dml1)==id(dml2))pass