前言
SQLAlchemy 是一个强大的 Python SQL 工具包和对象关系映射(ORM)系统,是业内比较流行的ORM,设计非常优雅。随着其2.0版本的发布,SQLAlchemy 引入了原生的异步支持,这极大地增强了其在处理高并发和异步I/O场景下的能力。通过结合像greenlet、gevent这样的协程库,SQLAlchemy 使得异步数据库操作成为可能,从而提高了应用程序的性能和响应速度。
这里我将基于SQLAlchemy的异步支持,封装一些常用的增删改查(CRUD)操作到 https://github.com/HuiDBK/py-tools 中,以便在项目开发中更加便捷地使用。
Github: https://github.com/sqlalchemy/sqlalchemy
2.0文档:https://docs.sqlalchemy.org/en/20/index.html
简单使用
封装前,先简单介绍下如何使用 SQLAIchemy。
具体细节可以参考官网文档:https://docs.sqlalchemy.org/en/20/orm/quickstart.html
安装依赖
pip install sqlalchemy[asyncio]==2.0.20
pip install aiomysql==0.2.0
这里安装了 sqlalchemy 2.0版本,以及 aiomysql 异步数据库驱动,进行演示。
创建异步数据库引擎
from sqlalchemy.ext.asyncio import create_async_engine # db_uri = "{protocol}://{user}:{password}@{host}:{port}/{db}"db_engine = create_async_engine("mysql+aiomysql://root:123456@127.0.0.1:3306/demo")
声明数据库表映射模型
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_columnclass BaseOrmTable(DeclarativeBase):"""SQLAlchemy Base ORM Model"""__abstract__ = Trueid: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, comment="主键ID")class UserTable(BaseOrmTable):"""用户表"""__tablename__ = "user"username: Mapped[str] = mapped_column(String(30), default="", comment="用户昵称")password: Mapped[str] = mapped_column(String(30), default="", comment="用户密码")phone: Mapped[str] = mapped_column(String(11), default="", comment="手机号")email: Mapped[str] = mapped_column(String(30), default="", comment="邮箱")avatar: Mapped[str] = mapped_column(String(100), default="", comment="头像")
简单db操作
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column# db_uri = "{protocol}://{user}:{password}@{host}:{port}/{db}"db_engine = create_async_engine("mysql+aiomysql://root:123456@127.0.0.1:3306/hui-demo")Session = async_sessionmaker(db_engine)async def create_tables():# 根据映射创建库表async with db_engine.begin() as conn:await conn.run_sync(BaseOrmTable.metadata.create_all)async def main():await create_tables()async with Session.begin() as session:# 添加用户new_user = UserTable(username='hui', email='huidbk@163.com')session.add(new_user)await session.flush() # 刷新table 对象属性,获取新增的idprint(new_user.id)print("add user", new_user.__dict__)# 获取用户user = await session.get(UserTable, new_user.id)print("get user", user.__dict__)# 更新用户user.email = 'hui@163.com'await session.merge(user)print("updated user", user.__dict__)# 删除用户await session.delete(user)if __name__ == '__main__':# 运行主函数asyncio.run(main())
常用DB操作封装
SQLAlchemyManager
class SQLAlchemyManager(metaclass=SingletonMetaCls):DB_URL_TEMPLATE = "{protocol}://{user}:{password}@{host}:{port}/{db}"def __init__(self,host: str = "localhost",port: int = 3306,user: str = "",password: str = "",db_name: str = "",pool_size: int = 30,pool_pre_ping: bool = True,pool_recycle: int = 600,log: Union[logging.Logger] = None,):self.host = hostself.port = portself.user = userself.password = passwordself.db_name = db_nameself.pool_size = pool_sizeself.pool_pre_ping = pool_pre_pingself.pool_recycle = pool_recycleself.log = log or loggerself.db_engine: AsyncEngine = Noneself.async_session_maker: async_sessionmaker = Nonedef get_db_url(self, protocol: str = "mysql+aiomysql"):db_url = self.DB_URL_TEMPLATE.format(protocol=protocol, user=self.user, password=self.password, host=self.host, port=self.port, db=self.db_name)return db_urldef init_db_engine(self, protocol: str):"""初始化db引擎Args:protocol: 驱动协议类型Returns:self.db_engine"""db_url = self.get_db_url(protocol=protocol)self.log.info(f"init_db_engine => {db_url}")self.db_engine = create_async_engine(url=db_url, pool_size=self.pool_size, pool_pre_ping=self.pool_pre_ping, pool_recycle=self.pool_recycle)self.async_session_maker = async_sessionmaker(bind=self.db_engine, expire_on_commit=False)return self.db_enginedef init_mysql_engine(self, protocol: str = "mysql+aiomysql"):"""初始化mysql引擎Args:protocol: 驱动协议类型Returns:self.db_engine"""return self.init_db_engine(protocol=protocol)
SQLAlchemyManager 主要封装一些数据库账户配置信息、连接池信息。
pool_size(连接池大小): 指定连接池中允许保持的最大连接数。当应用程序需要访问数据库时,连接池会维护一定数量的数据库连接,以便快速地响应请求。通常情况下,pool_size 的值应该根据应用程序的并发访问量和数据库的性能来进行调整。
pool_pre_ping(预检查连接): 指定是否在数据库连接被使用前对连接进行预检查。预检查可以确保连接处于活动状态,并且可以自动重新连接到数据库服务器,以防止连接由于长时间空闲而失效。启用预检查可以提高应用程序对数据库的可靠性和稳定性。
pool_recycle(连接回收时间): 指定数据库连接在被重新使用之前的最大空闲时间。当连接空闲时间超过 pool_recycle 设置的值时,连接将被关闭并重新创建,以防止连接长时间处于空闲状态而导致的连接问题。pool_recycle 的值通常设置为一个较小的时间间隔,以确保连接能够及时地得到回收和重建,从而提高连接的健壮性和性能。
init_db_engine
方法则是初始化数据库引擎,内部根据数据库配置信息
- 构造异步的数据库引擎 db_engine
- 维护一个 async_session_maker 数据库会话工厂
BaseORMTable 映射库表封装
from datetime import datetime
from sqlalchemy import func
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_columnclass BaseOrmTable(AsyncAttrs, DeclarativeBase):"""SQLAlchemy Base ORM Model"""__abstract__ = Trueid: Mapped[int] = mapped_column(primary_key=True, comment="主键ID")def __repr__(self):return str(self.to_dict())def to_dict(self, alias_dict: dict = None, exclude_none=True) -> dict:"""数据库模型转成字典Args:alias_dict: 字段别名字典 eg: {"id": "user_id"}, 把id名称替换成 user_idexclude_none: 默认排查None值Returns: dict"""alias_dict = alias_dict or {}if exclude_none:return {alias_dict.get(c.name, c.name): getattr(self, c.name)for c in self.__table__.columns if getattr(self, c.name) is not None}else:return {alias_dict.get(c.name, c.name): getattr(self, c.name, None)for c in self.__table__.columns}class TimestampColumns(AsyncAttrs, DeclarativeBase):"""时间戳相关列"""__abstract__ = Truecreated_at: Mapped[datetime] = mapped_column(default=datetime.now, comment="创建时间")updated_at: Mapped[datetime] = mapped_column(default=datetime.now, onupdate=datetime.now, comment="更新时间")deleted_at: Mapped[datetime] = mapped_column(nullable=True, comment="删除时间")class BaseOrmTableWithTS(BaseOrmTable, TimestampColumns):__abstract__ = True
创建一些基础的 ORM 类,以便后续的映射类可以继承并且共享一些公有属性和方法。
-
BaseOrmTable
类:- 定义了一个基础的 ORM 模型类,继承了
AsyncAttrs
和DeclarativeBase
。这样做使得BaseOrmTable
类具有了异步属性访问的能力,为异步编程提供便利,特别是在异步环境中访问具有延迟加载或者异步加载特性的属性。 - 提供了一个
to_dict
方法,用于将数据库模型转换为字典。它支持通过参数alias_dict
指定字段别名,并且可以选择是否排除值为 None 的属性。
- 定义了一个基础的 ORM 模型类,继承了
-
TimestampColumns
类:- 定义了一个包含时间戳相关列的抽象基类。这些列通常在很多数据库表中都会有,用于记录数据的创建时间、更新时间和删除时间。
- 这些列被设置为默认值,比如
created_at
和updated_at
默认使用datetime.now
函数来自动记录当前时间,deleted_at
则允许为空,用于标记数据的删除时间(可用作于逻辑删除)
-
BaseOrmTableWithTS
类:- 继承了
BaseOrmTable
和TimestampColumns
,实际上是一个组合类,集成了基础的 ORM 功能和时间戳相关的列。 - 这个类进一步封装了
BaseOrmTable
和TimestampColumns
,使得后续的映射类只需要继承这个类,就能够拥有基础的 ORM 功能和时间戳相关的列。
- 继承了
通过这种封装,你可以在后续的数据库映射类中更加专注于业务逻辑的实现,而不需要重复编写基础的 ORM 功能和时间戳相关的列,提高了代码的重用性和可维护性。
DBManager 数据库通用操作封装
前置封装说明
from typing import Any, List, Type, TypeVar, Union
from py_tools.connections.db.mysql import BaseOrmTable
from py_tools.meta_cls import SingletonMetaCls# 泛指 BaseOrmTable 所有子类实例对象类型
T_BaseOrmTable = TypeVar("T_BaseOrmTable", bound=BaseOrmTable)
T_Hints = TypeVar("T_Hints") # 用于修复被装饰的函数参数提示,让IDE有类型提示def with_session(method) -> T_Hints:"""兼容事务会话Args:method: orm 的 crudNotes:方法中没有带事务连接则,则构造Returns:"""@functools.wraps(method)async def wrapper(db_manager, *args, **kwargs):session = kwargs.get("session") or Noneif session:return await method(db_manager, *args, **kwargs)else:async with db_manager.transaction() as session:kwargs["session"] = sessionreturn await method(db_manager, *args, **kwargs)return wrapper
这里我提供了一个 with_session 装饰器,用于在需要数据库会话(事务)的数据库操作方法中自动开启事务,由于 sqlaichemy 官方推荐每个数据库操作都手动开启事务会话(自动提交),装饰器的设计没有时则构造,有则共享,这样不但可以减少冗余 async with db_manager.transaction() as session 的代码,也可以兼容多个操作共享同一个 session 有问题时进行事务回滚。
由于给方法加了通用的装饰器导致一些版本的IDE无法识别方法真实的签名,使用时会出现不知道方法的入参是什么,对于开发者来说是极其不方便的。
使用 typing 的 TypeVar 自定义类型来构造一个通用的泛型来当作函数返回的类型,进而修复。
from typing import TypeVar
T_Hints = TypeVar("T_Hints") # 用于修复被装饰的函数参数提示,让IDE有类型提示def with_session(method) -> T_Hints:...
这里PyCharm 2023.2.4 版本升级到 2024.1 就有提示了,IDE修复了,可以不用 T_Hints 了。
一些旧版本构造 sqlaichemy 的库表对象时也会出现不知道类对象属性入参提示,升级到最新版本都解决了。
from contextlib import asynccontextmanagerclass DBManager(metaclass=SingletonMetaCls):DB_CLIENT: SQLAlchemyManager = Noneorm_table: Type[BaseOrmTable] = None@classmethoddef init_db_client(cls, db_client: SQLAlchemyManager):cls.DB_CLIENT = db_clientreturn cls.DB_CLIENT@classmethod@asynccontextmanagerasync def transaction(cls):"""事务上下文管理器"""async with cls.DB_CLIENT.async_session_maker.begin() as session:yield session@classmethod@asynccontextmanagerasync def connection(cls) -> AsyncIterator[AsyncConnection]:"""数据库引擎连接上下文管理器"""async with cls.DB_CLIENT.db_engine.begin() as conn:yield conn
- init_db_client 方法用于初始化数据库客户端(引擎)。
- transaction 则是简单的通过 contextlib 中 asynccontextmanager 封装一个异步的上下文管理器方便简洁的开启一个数据库会话(事务)进行数据库相关操作。
- connection 数据库引擎连接上下文管理器。
- orm_table 是具体继承 DBManager 的子类进行指定的,用于操作具体的库表(orm_table)。
- DBManager 通过 SingletonMetaCls 元类实现单例模式。具体单例模式可以了解 https://juejin.cn/post/7272006755265380367 这篇文章有详细的介绍。
DB添加操作封装
class DBManager(metaclass=SingletonMetaCls):DB_CLIENT: SQLAlchemyManager = Noneorm_table: Type[BaseOrmTable] = None@with_sessionasync def bulk_add(self,table_objs: List[Union[T_BaseOrmTable, dict]],*,orm_table: Type[BaseOrmTable] = None,flush: bool = False,session: AsyncSession = None) -> List[T_BaseOrmTable]:"""批量插入Args:table_objs: orm映射类实例列表eg.[UserTable(username="hui", age=18), ...] or [{"username": "hui", "age": 18}, ...]orm_table: orm表映射类flush: 刷新对象状态,默认不刷新session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务Returns:成功插入的对象列表"""orm_table = orm_table or self.orm_tableif all(isinstance(table_obj, dict) for table_obj in table_objs):# 字典列表转成orm映射类实例列表处理table_objs = [orm_table(**table_obj) for table_obj in table_objs]session.add_all(table_objs)if flush:await session.flush(table_objs)return table_objs@with_sessionasync def add(self,table_obj: [T_BaseOrmTable, dict],*,orm_table: Type[BaseOrmTable] = None,session: AsyncSession = None) -> int:"""插入一条数据Args:table_obj: orm映射类实例对象, eg. UserTable(username="hui", age=18) or {"username": "hui", "age": 18}orm_table: orm表映射类session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务Returns: 新增的idtable_obj.id"""orm_table = orm_table or self.orm_tableif isinstance(table_obj, dict):table_obj = orm_table(**table_obj)session.add(table_obj)await session.flush(objects=[table_obj]) # 刷新对象状态,获取新增的idreturn table_obj.id
这里就是用 session.add 与 add_all 方法封装了数据库添加、批量添加的操作,封装的点主要在于除了 orm_table 实例对象入参还支持字典入参,内部还是转换成库表映射类实例来操作,最后通过 session.flush 方法,单个添加返回新增的主键id,批量添加则是返回实例对象列表。
设计的方法中有一个 * 号是参数的分隔符,它的作用是将其前面的参数声明为位置参数,而将 * 后面的参数声明为关键字参数,* 号后面的参数入参只能使用关键字形式的入参,我在很多的开源库中都看到了这样的设计,可以把一些函数语义连贯、常用必传的参数设置为位置参数,其他的则是关键字参数。这样可以明确参数的作用、提高函数的可读性、防止参数错误等。
具体看下使用案例:
import asynciofrom sqlalchemy import String
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_columnfrom py_tools.connections.db.mysql import BaseOrmTableWithTS, BaseOrmTable, DBManager, SQLAlchemyManagerclass UserTable(BaseOrmTableWithTS):"""用户表"""__tablename__ = "user"username: Mapped[str] = mapped_column(String(30), default="", comment="用户昵称")password: Mapped[str] = mapped_column(String(30), default="", comment="用户密码")phone: Mapped[str] = mapped_column(String(11), default="", comment="手机号")email: Mapped[str] = mapped_column(String(30), default="", comment="邮箱")avatar: Mapped[str] = mapped_column(String(100), default="", comment="头像")async def create_tables():# 根据映射创建库表(异步)# async with db_engine.begin() as conn:# await conn.run_sync(BaseOrmTable.metadata.create_all)async with DBManager.connection() as conn:await conn.run_sync(BaseOrmTable.metadata.create_all)async def init_orm_manager():db_client = SQLAlchemyManager(host="127.0.0.1",port=3306,user="root",password="123456",db_name="hui-demo",)db_client.init_mysql_engine()DBManager().init_db_client(db_client)async def manager_crud():user = {"username": "hui", "email": "huidbk.163.com"}user_id = await DBManager().add(table_obj=user, orm_table=UserTable)print("user_id", user_id)users = [{"username": "zack", "email": "zack.163.com"},{"username": "wang", "email": "wang.163.com"}]add_users = await DBManager().bulk_add(table_objs=users, orm_table=UserTable)add_user_ids = [user.id for user in add_users]print("add_user_ids", add_user_ids)async def main():await create_tables()# await normal_crud()await init_orm_manager()await manager_crud()if __name__ == '__main__':# 运行主函数asyncio.run(main())
在程序启动时初始化好DBManager 的 DB_CLIENT 就可以直接使用封装的方法,主要就是 DB_CLIENT 作为类属性,后面DBManager 实例与子类实例对象都可以共享这个数据库引擎。但我这里还是不推荐上面的写法,DBManager 是一些通用的DB操作,而具体一些业务操作还是单独封装一些DB业务Manager类来进行会比较好,更利于扩展维护与复用。
class UserManager(DBManager):orm_table = UserTableasync def get_name_by_email(self, email):username = await self.query_one(cols=["username"], conds=[self.orm_table.email == email], flat=True)return usernameasync def manager_crud():# demo 2 (推荐)user = UserTable(username="hui-test01", email="hui-test01.163.com")user_id = await UserManager().add(table_obj=user)print("user_id", user_id)users = [UserTable(username="hui-test02", email="hui-test02.163.com"),UserTable(username="hui-test03", email="hui-test03.163.com"),]add_users = await UserManager().bulk_add(table_objs=users)add_user_ids = [user.id for user in add_users]print("add_user_ids", add_user_ids)username = await UserManager().get_name_by_email(email="huidbk.163.com")print("username", username)>>> out
user_id 4
add_user_ids [5, 6]
username hui
这里 UserManager 单独封装的 get_name_by_email 的方法就是业务中常用查询操作通过邮件获取用户名称,这里就是举一个简单的例子,具体DB业务具体封装而不是全部写在逻辑层,这样别人要用的时候就不用重新组织条件参数、上下文,而是简单传递业务参数进行复用获取数据。
UserManager 调用 add、bulk_add 等方法时也不用像 DBManager 指定 orm_table 参数,使用起来更简洁。具体是因为 UserManager 类指定了 类属性 orm_table = UserTable,再封装时有一句 orm_table = orm_table or self.orm_table 意思就是优先选择入参的orm_table,没有则是 self.orm_table (具体实例对象的orm_table)。这样写也体现出 封装、继承的灵活性。
这里也引出了另一个封装方法 query_one 查询单条数据。由于介绍了一些Demo如果把所有的封装方法混合到一起篇幅就太长,故而我准备分成三篇进行分别介绍,这样也更好阅读。
- SQLAIchemy 异步DBManager封装-01入门理解
- SQLAIchemy 异步DBManager封装-02熟悉掌握
- SQLAIchemy 异步DBManager封装-03得心应手
Github源代码
源代码已上传到了Github,里面也有具体的使用Demo,欢迎大家一起体验、贡献。
HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com)