目录
- 前言
- 封装代码
- 测试代码
- 参考
前言
协程异步操作MYSQL是常用的,博主这里在GitHub上找了两个包,databases和aiomysql,第一个包除了mysql外还支持其他的数据库,且操作MYSQL时底层也是使用的aiomysql,但文档内容比较少,所以选择了对aiomysql进行封装。使用
pip3 install aiomysql
安装即可。
封装代码
async_mysql.py
# coding:utf-8
import aiomysql
import tracebackclass AsyncMySQLClient:def __init__(self, host: str, user: str, password: str, db: str, port: int = 3306, loop=None):self.host = hostself.port = portself.user = userself.password = passwordself.db = dbself.loop = loopself.pool = None# 创建连接池async def connect(self):try:self.pool = await aiomysql.create_pool(host=self.host,port=self.port,user=self.user,password=self.password,db=self.db,loop=self.loop,autocommit=True)except:print(f"connect error:{traceback.format_exc()}")# 关闭连接池async def close(self):self.pool.close()await self.pool.wait_closed()# 执行一条非查询类语句async def execute(self, query:str, args:tuple=None) -> int:async with self.pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute(query, args)return cur.rowcount# 批量执行非查询类语句async def executemany(self, query:str, args:list=None) -> int:async with self.pool.acquire() as conn:async with conn.cursor() as cur:await cur.executemany(query, args)return cur.rowcount# 查询单条数据async def fetchone(self, query:str, args:tuple=None) -> dict:async with self.pool.acquire() as conn:async with conn.cursor(aiomysql.DictCursor) as cur:await cur.execute(query, args)return await cur.fetchone()# 查询多条数据async def fetchall(self, query:str, args=None) -> list:async with self.pool.acquire() as conn:async with conn.cursor(aiomysql.DictCursor) as cur:await cur.execute(query, args)return await cur.fetchall()# 封装单条插入async def insert(self, table: str, data: dict) -> int:""":param table: 表名:param data: 数据:return:"""keys = ','.join(data.keys())values = ','.join(['%s'] * len(data))query = f'INSERT INTO {table} ({keys}) VALUES ({values})'try:return await self.execute(query, tuple(data.values()))except:print(f"execute {query} with {data} failed, error{traceback.format_exc()}")return 0# 封装批量插入async def insert_many(self, table:str, data_list:list)->int:""":param table: 表名:param data_list: 数据列表:return:"""keys = ','.join(data_list[0].keys())values = ','.join(['%s'] * len(data_list[0]))query = f'INSERT INTO {table} ({keys}) VALUES ({values})'args = [tuple(data.values()) for data in data_list]try:return await self.executemany(query, args)except:print(f"execute {query} with {args} failed, error{traceback.format_exc()}")return 0
非查询类的返回影响行数,查询类的返回数据,使用字典(或字典为元素的列表)。对于常用的插入和查询进行了封装,其他的使用execute函数即可。
使用时生成类的对象,之后connect,执行语句后close即可。
测试代码
import asyncio
import uuid
from async_mysql import AsyncMySQLClientasync def main():cli = AsyncMySQLClient(host="127.0.0.1",user="root",password="12345678",db="test")await cli.connect()nums = await cli.insert(table="users",data={"id": uuid.uuid4(),"name": "lady_killer9"})print(f"插入{nums}条数据")random_id = uuid.uuid4()nums = await cli.insert_many(table="users",data_list=[{"id": random_id,"name": "lady_killer9"}])print(f"插入{nums}条数据")user = await cli.fetchone(query="select * from users where id = %s",args=(random_id,))print(f"查询到用户:{user}")users = await cli.fetchall(query="select * from users where name = 'lady_killer9'")print(f"所有用户:{users}")num = await cli.execute(query="update users set name='lady_killer8' where id = %s",args=(random_id,))if num == 1:print("修改成功")nums = await cli.execute(query="delete from users")print(f"删除{nums}条数据")await cli.close()if __name__ == '__main__':asyncio.run(main())
截图如下:
参考
databases
aiomysql
更多python相关内容,参考:【python总结】python学习框架梳理