SqlAlchemy使用教程(七) 异步访问数据库

在这里插入图片描述

  • SqlAlchemy使用教程(一) 原理与环境搭建
  • SqlAlchemy使用教程(二) 入门示例及编程步骤
  • SqlAlchemy使用教程(三) CoreAPI访问与操作数据库详解
  • SqlAlchemy使用教程(四) MetaData 与 SQL Express Language 的使用
  • SqlAlchemy使用教程(五) ORM API 编程入门
  • SqlAlchemy使用教程(六) – ORM 表间关系的定义与CRUD操作

注:本章要求熟悉Python异步编程的基础知识

1、SqlAlchemy 异步编程基础

1.1 异步访问数据的优点

  • 当数据库访问较频繁时,异步编程可以明显提高运行效率。
  • 可以配合FastAPI 等异步框架,用异步访问使用SqlAlchemy,充分发挥异步框架的优势 。

SqlAlchmy 1.4 提供了Core层的异步接口, 2.0提供了 异步ORM接口

1.2 编程环境准备

(1) 安装异步依赖库

sqlalchemy 的异步接口基于 greenlet,在setuptools配置中为可选安装,

安装异步
pip install sqlalchemy[asyncio]

或者自已在setup.py 中查找greenlet版本号,手工
pip install greenlet==xx.yy.zz

(2)安装数据库的异步驱动。

pip install aiosqlite

以下是常见数据库的异步驱动库
sqlite3 :

  • aiosqlite

mysql:

  • aiomysql。
  • asyncmy: 这是1个支持 MySQL/MariaDB 的高性能异步库

PostgreSQL:

  • aiopg,
  • asyncpg:
  • asyncpgsa: 是asyncpg库的封装,适用于Sqlalchemy.

2、Core Async API

Core API层的异步,首先通过 create_async_engine() 创建1个异步AsyncEngine对象,该对象提供异步连接AsyncEngine.connect() , 或者通过上下文使用AsyncEngine.begin()创建的连接。
同时,还提供了AsyncConnection.run_sync() 用于执行一些内部的同步方法,如 MetaData.create_all()。

下面我们通过实现来查看1个完整过程

import asyncio
from sqlalchemy import Column, MetaData, select, String, Table
from sqlalchemy.ext.asyncio import create_async_enginemeta = MetaData()
t1 = Table('tb1',meta,Column('name', String(50), primary_key=True),Column('profile', String(50), nullable=True),
)async def main():engine = create_async_engine("sqlite+aiosqlite:///:memory:")async with engine.begin() as conn:await conn.run_sync(meta.create_all)await conn.execute(t1.insert(), [{"name": "some name 1", 'profile': 'some profile 1'},{"name": "some name 2", 'profile': 'some profile 2'}])async with engine.begin() as conn:result = await conn.execute(select(t1).where(t1.c.name == "some name 1"))print(result.fetchall())await engine.dispose()
asyncio.run(main())

异步流式查询

使用AsyncConnection.stream() 执行SQL, 返回AsyncResult对象。

async with engine.connect() as conn:async_result = await conn.stream(select(t1))async for row in async_result:print("row: %s" % (row,))

3、异步ORM 编程API

3.1 异步ORM API介绍

ORM的异步接口主要由AsyncSession 类来提供。
AsyncSession对象由async_sessionmaker() 工厂方法来创建。
注意:1个AsyncSession 实例只能用在1个coroutine 内。 每个协程要使用不同的AsyncSession对象。

async_session = async_sessionmaker(async_engine, expire_on_commit=False)

手动关闭AsyncSession对象,
AsyncSession.close()

异步执行SQL操作
AsyncSession.execute()
AsyncSession.scalars()

对于具有relation 关系的表的操作,同步模式下存在由lazy load 带来的Implicit IO,异步模式下不支持lazy load.

3.2 完整示例

import asyncio
from typing import List
from sqlalchemy import ForeignKey, select, String, Integer
from sqlalchemy.ext.asyncio import (create_async_engine,async_sessionmaker,AsyncSession,AsyncAttrs)
from sqlalchemy.orm import (DeclarativeBase,Mapped,mapped_column,relationship,selectinload)class Base(DeclarativeBase):passclass User(Base):__tablename__ = "user"id: Mapped[int] = mapped_column(primary_key=True)name: Mapped[str] = mapped_column(String(30))age: Mapped[int] = mapped_column(Integer())company_id: Mapped[int] = mapped_column(ForeignKey("company.id"))company: Mapped["Company"] = relationship(back_populates="users")class Company(Base):__tablename__ = "company"id: Mapped[int] = mapped_column(primary_key=True)name: Mapped[str] = mapped_column(String(30))users: Mapped[List[User]] = relationship()async def insert_data(async_session: async_sessionmaker[AsyncSession]) -> None:async with async_session() as session:async with session.begin():session.add_all([Company(name="Baidu", users=[]),Company(name="Alibaba", users=[]),User(name='Tom', age=21, company_id=1),User(name='Jerry', age=22, company_id=2),User(name='Jack', age=23, company_id=1),])async def main() -> None:engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)async_session = async_sessionmaker(engine, expire_on_commit=False)# 创建表async with engine.begin() as conn:await conn.run_sync(Base.metadata.create_all)# 使用异步协程插入数据await asyncio.gather(insert_data(async_session))# await insert_data(async_session)  # 或者直接执行 # 查询User表数据,联合查询Company表数据async with async_session() as session:stmt = select(User, Company).join(User.company).order_by(User.name)result = await session.execute(stmt)for row in result.scalars():print(row.id, row.name, row.age, row.company.name)# 查询 Company 数据,反向查询User表数据print("查询 Company 数据,反向查询User表数据")async with async_session() as session:stmt = select(Company).options(selectinload(Company.users))result = await session.execute(stmt)for row in result.scalars():print(row.id, row.name)for user in row.users:print('\t', user.id, user.name, user.age)await engine.dispose()asyncio.run(main())

执行上述代码,输出为:

output 
3 Jack 23 Baidu
2 Jerry 22 Alibaba
1 Tom 21 Baidu
查询 Company 数据,反向查询User表数据
1 Baidu1 Tom 213 Jack 23
2 Alibaba2 Jerry 22

说明:

  • User表定义有外键字段,与company是1对1对多关系,查询User表时,如果希望同时获得 Company表数据,应使用联合查询。
  • 查询Company表中,反向查询 User表数据,须处理懒加载问题,参考下一节.

3.3 关系查询中懒加载问题

如果A, B之间存在1对多关系, B中的外键指向A,SqlAlchemy在查询 A表的数据后,如果设置了反向查询字段,默认 SqlAlchemy会对关联表隐式地发送查询请求。由于这个I/O是同步的,因此 AsyncSession是不支持此操作。会Block此操作。

有两种解决办法:

方法一:引入AsyncAttrs Mixin混入类


from sqlalchemy.ext.asyncio import AsyncAttrsclass Base(AsyncAttrs, DeclarativeBase):   # Base引入AsyncAttrspass
# 表A与B之间存在外键关系。 
class A(Base):__tablename__ = "a"# ... rest of mapping ...bs: Mapped[List[B]] = relationship()  # 反射查询关系class B(Base):__tablename__ = "b"a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))# ... rest of mapping ...

A的 bs属性查询时是lazy load,将被做为 AsyncAttrs来处理,阻止其发磅IO
联合查询时,要手工用异步方式执行查询操作

a1 = (await session.scalars(select(A))).one()
for b1 in await a1.awaitable_attrs.bs:print(b1)

方式2: 用异步eager load加载关系表数据
如果不使用AsyncAttrs 方式,可用 eager load 来解决:
最常用eager load方法为selectinload() ,其与select()形成链式调用

stmt = select(A).options(selectinload(A.bs))
result = wait session.scalars(stmt)
for r in result: print(r.id, r.data, r.bs) 

注意:

  • 当A构建新对象时,对bs总是赋个空值, 如 A(bs=[], data="a2")

3.4 运行同步方法

如同上一节提到,Core API 的 AsyncConnection对象提供了run_sync()方法运行同步方法,同样ORM API中,AsyncSession对象也提供了run_sync() 执行同步方法。

await session.run_sync(fetch_and_update_objects)    # fetch_and_update_objects() 是1个同步方法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/54066.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS文本格式化

通过 CSS 中的文本属性您可以像操作 Word 文档那样定义网页中文本的字符间距、对齐方式、缩进等等,CSS 中常用的文本属性如下所示: text-align:设置文本的水平对齐方式;text-decoration:设置文本的装饰;te…

【拥抱AIGC】通义灵码扩展管理

通义灵码提供了扩展管理,支持自定义指令,满足企业编码场景的扩展诉求。 适用版本 企业标准版、企业专属版 通义灵码管理员、组织内全局管理员(专属版)在通义灵码控制台-扩展管理中,进行自定义指令的管理、查看自定义…

leetcode|刷算法 线段树原理以及模板

线段树出现的题目特征 线段树使用的题目。每次操作都要得到返回结果的。 比如 699. 掉落的方块 - 力扣(LeetCode) 2286. 以组为单位订音乐会的门票 - 力扣(LeetCode) 1845. 座位预约管理系统 - 力扣(LeetCode&#…

【PHP陪玩系统源码】游戏陪玩系统app,陪玩小程序优势

陪玩系统开发运营级别陪玩成品搭建 支持二开源码交付,游戏开黑陪玩系统: 多客陪玩系统,游戏开黑陪玩,线下搭子,开黑陪玩系统 前端uniapp后端php,数据库MySQL 1、长时间的陪玩APP源码开发经验,始终坚持从客户…

Docker镜像命令和容器命令

目录 镜像命令 镜像命名规范 镜像操作命令 DockerHub拉取镜像 利用docker save将nginx镜像导出磁盘,然后再通过load加载回来 总结 容器命令介绍和案例 容器相关命令 案例:创建运行一个Nginx容器 总结 镜像命令 镜像命名规范 镜像名称一般分两…

uniapp框架中实现文件选择上传组件,可以选择图片、视频等任意文件并上传到当前绑定的服务空间

前言 uni-file-picker是uniapp中的一个文件选择器组件,用于选择本地文件并返回选择的文件路径或文件信息。该组件支持选择单个文件或多个文件,可以设置文件的类型、大小限制,并且可以进行文件预览。 提示:以下是本篇文章正文内容,下面案例可供参考 uni-file-picker组件具…

了解华为计算产品线,昇腾的业务都有哪些?

🍉 CSDN 叶庭云:https://yetingyun.blog.csdn.net/ 随着 ChatGPT 的现象级爆红,它引领了 AI 大模型时代的深刻变革,进而造成 AI 算力资源日益紧缺。与此同时,中美贸易战的持续也使得 AI 算力国产化适配成为必然趋势。 …

Temporal Dynamic Quantization for Diffusion Models阅读

文章目录 AbstractIntroductionBackgrounds and Related Works2.1 扩散模型2.2 量化2.3 量化感知训练和训练后量化 TemporalDynamic Quantization3.1 量化方法3.2 扩散模型量化的挑战3.3 TDQ模块的实现3.4 工程细节时间步的频率编码TDQ模块的初始化 Experimental SetupResults5…

基于SpringBoot+Vue+MySQL的美食信息推荐系统

系统展示 用户前台界面 管理员后台界面 系统背景 在数字化时代,随着人们对美食文化的热爱与追求不断增长,美食信息推荐系统成为了连接食客与美食之间的重要桥梁。面对海量的美食信息,用户往往难以快速找到符合个人口味和需求的美食。因此&…

实用工具推荐---- PDF 转换

直接上链接:爱PDF |面向 PDF 爱好者的在线 PDF 工具 (ilovepdf.com) 主要功能如下: 全免费!!!!

FPGA远程烧录bit流

FPGA远程烧录bit流 Vivado支持远程编译并下载bit流到本地xilinx开发板。具体操作就是在连接JTAG的远程电脑上安装hw_server.exe。比如硬件板在实验室或者是其他地方,开发代码与工程在本地计算机,如何将bit流烧录到实验室或者远程开发板? vi…

机器学习-TopicModel

概率图模型基础概率潜在语义分析(PLSA)LDA 概率图模型基础 猜球游戏 有两个信封,其中一个装有一个红球,一个黑球。另一个信封有两个黑球。 。 假设红球价值100元,黑球价值1元。 你随机从其中拿起一个信封,从…

STM32 OLED

文章目录 前言一、OLED是什么?二、使用步骤1.复制 OLED.C .H文件1.1 遇到问题 2.统一风格3.主函数引用头文件3.1 oled.h 提供了什么函数 4.介绍显示一个字符的函数5. 显示十进制函数的讲解 三、使用注意事项3.1 配置符合自己的引脚3.2 花屏总结 前言 提示&#xff…

简单vue指令实现 el-table 可拖拽表格功能

安装 SortableJS sorttableJs 相关优点如下: 相关配置项 参考 👉 SortableJS中文官网 pnpm i sortablejs封装成指令 不多逼逼,直接上才艺 🤪🤪🤪 先安装一个 nanoid 插件 用于生成随机id,注…

亚洲市场|人工智能对固态硬盘SSD需求影响

随着人工智能(AI)技术的快速发展,对于高效能存储的需求也在日益增长。在亚洲市场中,固态硬盘(SSD)作为关键的数据存储设备,其重要性不言而喻。 扩展阅读: 内存:生成式AI带来全新挑战与机遇 这可能是最清晰的AI存储数…

瑞芯微RK3566鸿蒙开发板Android11修改第三方输入法为默认输入法

本文适用于触觉智能所有支持Android11系统的开发板修改第三方输入法为默认输入法。本次使用的是触觉智能的Purple Pi OH鸿蒙开源主板,搭载了瑞芯微RK3566芯片,类树莓派设计,是Laval官方社区主荐的一款鸿蒙开发主板。 一、安装输入法并查看输入…

YOLOv8改进 | 主干篇,YOLOv8改进主干网络为华为的轻量化架构GhostNetV1

摘要 摘要:将卷积神经网络(CNN)部署在嵌入式设备上是困难的,因为嵌入式设备的内存和计算资源有限。特征图的冗余是成功的 CNN 的一个重要特征,但在神经网络架构设计中很少被研究。作者提出了一种新颖的 Ghost 模块,用于通过廉价操作生成更多的特征图。基于一组内在特征图…

使用Python和Proxy302代理IP高效采集Bing图片

目录 项目背景一、项目准备环境配置 二、爬虫设计与实现爬虫设计思路目标网站分析数据获取流程 代码实现1. 初始化爬虫类(BingImageSpider)2. 创建存储文件夹3. 获取图像链接4. 下载图片5. 使用Proxy302代理IP6. 主运行函数 运行截图 三、总结 项目背景 …

VS开发 - 静态编译和动态编译的基础实践与混用

目录 1. 基础概念 2. 直观感受一下静态编译和动态编译的体积与依赖项目 3. VS运行时库包含哪些主要文件(从VS2015起) 4. 动态库和静态库混用的情况 5. 感谢清单 1. 基础概念 所谓的运行时库(Runtime Library)就是WINDOWS系统…

WPS在表格中填写材料时,内容过多导致表格不换页,其余内容无法正常显示 以及 内容过多,导致表格换页——解决方法

一、现象 1,内容过多导致表格不换页,其余内容无法正常显示 2,内容过多,导致表格换页 二、解决方法 在表格内右击,选择表格属性 在菜单栏选择行,勾选允许跨页断行,点击确定即可 1&#xff0…