最近使用它交互数据库,想实现类似java aop那种自动事务控制,不用手动commit或者rollback。我是用的是flask+denpendency-injecter
这是我的db的配置类,里面会初始化一些session配置,里面比较重要的是把autocommit和autoflush关闭了,因为我们的代码会来处理这个,还有就是把expire_on_commit设置为flase,否则你commit之后,再取用某个entity就会报错了,例如你新建了一个entity,这个时候会更新他的id,返回给前端的时候就会报错了(Error Messages — SQLAlchemy 2.0 Documentation)。
"""Database module."""from contextlib import contextmanager, AbstractContextManager
from typing import Callablefrom sqlalchemy import create_engine, orm
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
Base = declarative_base()class DatabaseConfig:def __init__(self, db_url: str) -> None:self._engine = create_engine(db_url, echo=True)self._session_factory = orm.scoped_session(orm.sessionmaker(autocommit=False,autoflush=False,expire_on_commit=False,bind=self._engine,),)def create_database(self) -> None:Base.metadata.create_all(self._engine)@contextmanagerdef session(self) -> Callable[..., AbstractContextManager[Session]]:session: Session = self._session_factory()try:yield sessionexcept Exception:session.rollback()raiseelse:if session._transaction.is_active:session.commit()session.close()
然后comtextmanger里面就是我们的处理代码了,我们主要依靠with代码块来控制,在yield之前的属于__init__,在yield之后属于__exit__,也就是当with代码块结束之前,如果发生任何报错,我们都会进行rollback操作,并且raise(这部分需要error handler来做了,这里就不赘述了),然后如果什么错误都没有发生,就检测transaction是否还是active,如果是就commit,然后关闭session。
然后在Container中注入session contextmanager。
class Container(containers.DeclarativeContainer):wiring_config = containers.WiringConfiguration(packages=["main"])config = providers.Configuration(yaml_files=["config.yml"])db=providers.Singleton(DatabaseConfig,db_url=config.db.url)user_repository = providers.Factory(UserRepositoryImpl)user_service = providers.Factory(UserService,user_repository=user_repository,session_factory=db.provided.session)
然后再service层使用with代码块控制transation ,整个逻辑包含在同一个with中就行了。
class UserService:@injectdef __init__(self, user_repository: UserRepository, session_factory: Callable[..., AbstractContextManager[Session]]) -> None:self._repository: UserRepository = user_repositoryself.session_factory=session_factorydef create_user(self,user) -> User:with self.session_factory() as session:return self._repository.add(session=session,user=user)
然后在repo里面写具体代码就行了
class UserRepositoryImpl(UserRepository):def __init__(self) -> None:passdef add(self, user,session):session.add(user)return user