1.安装DBUtils
pip install DBUtils
2.db_helper.py的代码如下
import pymysql
from dbutils.pooled_db import PooledDB
from config import configclass DBHelper:def __init__(self):""":param mincached:连接池中空闲连接的初始数量:param maxcached:连接池中空闲连接的最大数量:param maxshared:共享连接的最大数量:param maxconnections:创建连接池的最大数量:param blocking:超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理:param maxusage:单个连接的最大重复使用次数:param setsession:optional list of SQL commands that may serve to preparethe session, e.g. ["set datestyle to ...", "set time zone ..."]:param reset:how connections should be reset when returned to the pool(False or None to rollback transcations started with begin(),True to always issue a rollback for safety's sake):param host:数据库ip地址:param port:数据库端口:param db:数据库名:param user:用户名:param passwd:密码:param charset:字符编码"""mincached = 0maxcached = 0maxshared = 0maxconnections = 0blocking = Truemaxusage = 0setsession = Nonereset = Truehost = config.mysql_config["host"]port = config.mysql_config["port"]db = config.mysql_config["database"]user = config.mysql_config["user"]passwd = config.mysql_config["password"]charset = 'utf8mb4'try:self.__pool = PooledDB(pymysql,mincached, maxcached,maxshared, maxconnections, blocking,maxusage, setsession, reset,host=host, port=port, db=db,user=user, passwd=passwd,charset=charset,cursorclass=pymysql.cursors.DictCursor)except Exception as e:print(e)def get_conn(self):try:conn = self.__pool.connection()cursor = conn.cursor()return conn, cursorexcept Exception as e:print(e)@staticmethoddef dispose(cursor, conn):cursor.close()conn.close()# 返回执行execute()方法后影响的行数def execute(self, sql):try:conn, cursor = self.get_conn()cursor.execute(sql)rowcount = cursor.rowcountreturn rowcountexcept Exception as e:print(e)# 删除并返回影响行数def delete(self, **kwargs):conn, cursor = self.get_conn()table = kwargs['table']where = kwargs['where']sql = 'DELETE FROM %s where %s' % (table, where)print(sql)rowcount = 0try:# 执行SQL语句cursor.execute(sql)# 提交到数据库执行conn.commit()# 影响的行数rowcount = cursor.rowcountexcept Exception as e:print(e)# 发生错误时回滚conn.rollback()return rowcount# 新增并返回新增IDdef insert(self, **kwargs):conn, cursor = self.get_conn()table = kwargs['table']del kwargs['table']sql = 'insert into %s(' % tablefields = ""values = ""for k, v in kwargs.items():fields += "%s," % kvalues += "'%s'," % vfields = fields.rstrip(',')values = values.rstrip(',')sql = sql + fields + ")values(" + values + ")"print(sql)res = 0try:# 执行SQL语句cursor.execute(sql)# 提交到数据库执行conn.commit()# 获取自增idres = cursor.lastrowidexcept Exception as e:print(e)# 发生错误时回滚conn.rollback()return res# 修改数据并返回影响的行数def update(self, **kwargs):conn, cursor = self.get_conn()table = kwargs['table']# del kwargs['table']kwargs.pop('table')where = kwargs['where']kwargs.pop('where')sql = 'update %s set ' % tablefor k, v in kwargs.items():sql += "%s='%s'," % (k, v)sql = sql.rstrip(',')sql += ' where %s' % whereprint(sql)rowcount = 0try:# 执行SQL语句cursor.execute(sql)# 提交到数据库执行conn.commit()# 影响的行数rowcount = cursor.rowcountexcept Exception as e:print(e)# 发生错误时回滚conn.rollback()return rowcount# 查-一条条数据def selectOne(self, **kwargs):conn, cursor = self.get_conn()table = kwargs['table']field = 'field' in kwargs and kwargs['field'] or '*'where = 'where' in kwargs and 'where ' + kwargs['where'] or ''order = 'order' in kwargs and 'order by ' + kwargs['order'] or ''sql = 'select %s from %s %s %s limit 1' % (field, table, where, order)print(sql)data = Nonetry:# 执行SQL语句cursor.execute(sql)# 使用 fetchone() 方法获取单条数据.data = cursor.fetchone()except Exception as e:print(e)# 发生错误时回滚conn.rollback()return data# 查所有数据def selectAll(self, **kwargs):conn, cursor = self.get_conn()table = kwargs['table']field = 'field' in kwargs and kwargs['field'] or '*'where = 'where' in kwargs and 'where ' + kwargs['where'] or ''order = 'order' in kwargs and 'order by ' + kwargs['order'] or ''sql = 'select %s from %s %s %s ' % (field, table, where, order)print(sql)data = Nonetry:# 执行SQL语句cursor.execute(sql)# 使用 fetchone() 方法获取单条数据.data = cursor.fetchall()except Exception as e:print(e)# 发生错误时回滚conn.rollback()return data
3.config.py文件中mysql连接配置
mysql_config = {"host": "localhost", # MySQL服务器地址"port": 3066,"user": "root", # 用户名"password": "123456", # 密码"database": "my_db","charsets": "UTF8"
}
4. service中使用
代码如下:
db = DBHelper() # PooledDB初始化一次,不要多次初始化class MyService:# 多线程中调用该方法即可def select_data(self):conn, cursor=db.get_conn()cursor.execute('SELECT * FROM data')for row in cursor.fetchall():print(row['id'])
参考资料
https://blog.csdn.net/mtj66/article/details/125501757
https://blog.csdn.net/sinat_28984567/article/details/105427152