数据库和ORMS:使用SQLAlchemy与数据库通信

文章目录

    • 1. 环境安装
    • 2. 使用SQLAlchemy与SQL数据库通信
      • 2.1 创建表
      • 2.2 连接数据库
      • 2.3 insert、select
      • 2.4 update、delete
      • 2.5 relationships
      • 2.6 用Alembic进行数据库迁移

learn from 《Building Data Science Applications with FastAPI》

1. 环境安装

docker 安装 MongoDB 服务

 docker run -d --name fastapi-mongo -p 27017:27017 mongo:4.4

2. 使用SQLAlchemy与SQL数据库通信

安装 pip install databases[sqlite]

2.1 创建表

# models.pyimport sqlalchemy
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Fieldmetadata = sqlalchemy.MetaData()  # 创建元数据对象posts = sqlalchemy.Table(  # 创建表对象'posts',  # 表名metadata,  # 元数据对象# 列对象(列名,类型,其他选项)sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True, autoincrement=True),sqlalchemy.Column('publication_date', sqlalchemy.DateTime(), nullable=False),sqlalchemy.Column('title', sqlalchemy.String(255), nullable=False),sqlalchemy.Column('text', sqlalchemy.Text(), nullable=False),
)class PostBase(BaseModel):title: strtext: strpublication_date: datetime = Field(dafault_factory=datetime.now)class PostPartialUpdate(BaseModel):text: Optional[str] = Nonecontent: Optional[str] = Noneclass PostCreate(PostBase):passclass PostDB(PostBase):id: int

2.2 连接数据库

# _*_ coding: utf-8 _*_
# @Time : 2022/3/8 9:28
# @Author : Michael
# @File : database.py
# @desc :
import sqlalchemy
from databases import Database
DB_URL = 'sqlite:///cp6_sqlalchemy.db'
database = Database(DB_URL)
sqlalchemy_engine = sqlalchemy.create_engine(DB_URL)def get_database() -> Database:return database

2.3 insert、select

# _*_ coding: utf-8 _*_
# @Time : 2022/3/8 9:40
# @Author : Michael
# @File : app.py
# @desc :from typing import List, Tuple
import uvicorn
from databases import Database
from fastapi import Depends, FastAPI, HTTPException, Query, statusfrom database import get_database, sqlalchemy_engine
from models import metadata, posts, PostDB, PostCreate, PostPartialUpdateapp = FastAPI()@app.on_event('startup') # 启动的时候执行数据库连接
async def startup():await get_database().connect()metadata.create_all(sqlalchemy_engine)@app.on_event("shutdown") # 关闭的时候执行数据库断开连接
async def shutdown():await get_database().disconnect()async def pagination(skip: int = Query(0, ge=0),limit: int = Query(10, ge=0),) -> Tuple[int, int]:capped_limit = min(100, limit)return (skip, capped_limit)async def get_post_or_404(id: int, database: Database = Depends(get_database)) -> PostDB:select_query = posts.select().where(posts.c.id == id)raw_post = await database.fetch_one(select_query)if raw_post is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)return PostDB(**raw_post)# 开始插入数据
@app.post("/posts/", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_post(post: PostCreate, db: Database = Depends(get_database)) -> PostDB:# 创建插入语句,不必手写sqlinsert_query = posts.insert().values(post.dict())# 执行插入语句命令post_id = await db.execute(insert_query)post_db = await get_post_or_404(post_id, db)return post_db@app.get("/posts/{id}", response_model=PostDB)
async def get_post(post: PostDB = Depends(get_post_or_404)) -> PostDB:return post@app.get("/posts")
async def list_posts(pagination: Tuple[int, int] = Depends(pagination),database: Database = Depends(get_database),) -> List[PostDB]:skip, limit = paginationselect_query = posts.select().offset(skip).limit(limit)rows = await database.fetch_all(select_query)results = [PostDB(**row) for row in rows]return resultsif __name__ == '__main__':uvicorn.run(app='app:app', host="127.0.0.1", port=8001, reload=True, debug=True)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2.4 update、delete

# update
@app.patch("/posts/{id}", response_model=PostDB)
async def update_post(post_update: PostPartialUpdate,post: PostDB = Depends(get_post_or_404),database: Database = Depends(get_database)) -> PostDB:update_query = (posts.update().where(posts.c.id == post.id).values(post_update.dict(exclude_unset=True)))await database.execute(update_query)post_db = await get_post_or_404(post.id, database)return post_db

在这里插入图片描述

# delete
@app.delete("/posts/{id}",status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(post: PostDB = Depends(get_post_or_404),database: Database = Depends(get_database)) -> None:delete_query = posts.delete().where(posts.c.id == post.id)await database.execute(delete_query)

2.5 relationships

models.py 编写新的表

comments = sqlalchemy.Table("comments",metadata,sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True),# 定义连接的外键sqlalchemy.Column("post_id", sqlalchemy.ForeignKey("posts.id", ondelete="CASCADE"), nullable=False),sqlalchemy.Column("publication_date", sqlalchemy.DateTime(), nullable=False),sqlalchemy.Column("content", sqlalchemy.Text(), nullable=False),
)class CommentBase(BaseModel):post_id: intpublication_date: datetime = Field(default_factory=datetime.now)content: strclass CommentCreate(CommentBase):passclass CommentDB(CommentBase):id: int

app.py 添加内容

from typing import List, Mapping, Tuple, cast
from models import metadata, posts, PostDB, PostCreate, PostPartialUpdate, comments, CommentCreate, CommentDB@app.post("/comments", response_model=CommentDB, status_code=status.HTTP_201_CREATED)
async def create_comment(comment: CommentCreate, database: Database = Depends(get_database)
) -> CommentDB:# 选取post表单数据select_post_query = posts.select().where(posts.c.id == comment.post_id)post = await database.fetch_one(select_post_query)if post is None:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Post {id} does not exist")# 插入comment 语句insert_query = comments.insert().values(comment.dict())comment_id = await database.execute(insert_query)# 查询 commentselect_query = comments.select().where(comments.c.id == comment_id)raw_comment = cast(Mapping, await database.fetch_one(select_query))return CommentDB(**raw_comment)


获取一个post的全部comments

models.py

class PostPublic(PostDB):comments: Optional[List[CommentDB]] = None

app.py

# _*_ coding: utf-8 _*_
# @Time : 2022/3/8 9:40
# @Author : Michael
# @File : app.py
# @desc :from typing import List, Mapping, Tuple, cast
import uvicorn
from databases import Database
from fastapi import Depends, FastAPI, HTTPException, Query, statusfrom database import get_database, sqlalchemy_engine
from models import metadata, posts, PostDB, PostCreate, PostPartialUpdate, comments, CommentCreate, CommentDB, \PostPublicapp = FastAPI()@app.on_event('startup')  # 启动的时候执行数据库连接
async def startup():await get_database().connect()metadata.create_all(sqlalchemy_engine)@app.on_event("shutdown")  # 关闭的时候执行数据库断开连接
async def shutdown():await get_database().disconnect()async def pagination(skip: int = Query(0, ge=0),limit: int = Query(10, ge=0), ) -> Tuple[int, int]:capped_limit = min(100, limit)return (skip, capped_limit)async def get_post_or_404(id: int, database: Database = Depends(get_database)) -> PostPublic:select_query = posts.select().where(posts.c.id == id)raw_post = await database.fetch_one(select_query)if raw_post is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)# 编号为id的post的所有commentsselect_post_comment_query = comments.select().where(comments.c.post_id == id)raw_comments = await database.fetch_all(select_post_comment_query)comments_list = [CommentDB(**row) for row in raw_comments]return PostPublic(**raw_post, comments=comments_list)# 开始插入数据
@app.post("/posts", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_post(post: PostCreate, db: Database = Depends(get_database)) -> PostPublic:# 创建插入语句,不必手写sqlinsert_query = posts.insert().values(post.dict())# 执行插入语句命令post_id = await db.execute(insert_query)post_db = await get_post_or_404(post_id, db)return post_db@app.get("/posts")
async def list_posts(pagination: Tuple[int, int] = Depends(pagination),database: Database = Depends(get_database), ) -> List[PostDB]:skip, limit = paginationselect_query = posts.select().offset(skip).limit(limit)rows = await database.fetch_all(select_query)results = [PostDB(**row) for row in rows]return results@app.get("/posts/{id}", response_model=PostPublic)
async def get_post(post: PostPublic = Depends(get_post_or_404)) -> PostPublic:return post@app.get("/posts")
async def list_posts(pagination: Tuple[int, int] = Depends(pagination),database: Database = Depends(get_database), ) -> List[PostDB]:skip, limit = paginationselect_query = posts.select().offset(skip).limit(limit)rows = await database.fetch_all(select_query)results = [PostDB(**row) for row in rows]return results# update
@app.patch("/posts/{id}", response_model=PostPublic)
async def update_post(post_update: PostPartialUpdate,post: PostPublic = Depends(get_post_or_404),database: Database = Depends(get_database)) -> PostPublic:update_query = (posts.update().where(posts.c.id == post.id).values(post_update.dict(exclude_unset=True)))await database.execute(update_query)post_db = await get_post_or_404(post.id, database)return post_db# delete
@app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(post: PostPublic = Depends(get_post_or_404),database: Database = Depends(get_database)) -> None:delete_query = posts.delete().where(posts.c.id == post.id)await database.execute(delete_query)@app.post("/comments", response_model=CommentDB, status_code=status.HTTP_201_CREATED)
async def create_comment(comment: CommentCreate, database: Database = Depends(get_database)
) -> CommentDB:select_post_query = posts.select().where(posts.c.id == comment.post_id)post = await database.fetch_one(select_post_query)if post is None:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Post {comment.post_id} does not exist")insert_query = comments.insert().values(comment.dict())comment_id = await database.execute(insert_query)select_query = comments.select().where(comments.c.id == comment_id)raw_comment = cast(Mapping, await database.fetch_one(select_query))return CommentDB(**raw_comment)if __name__ == '__main__':uvicorn.run(app='app:app', host="127.0.0.1", port=8001, reload=True, debug=True)

在这里插入图片描述
在这里插入图片描述

2.6 用Alembic进行数据库迁移

pip install alembic

终端输入:

alembic init alembic

初始化迁移环境,其中包括一组文件和目录,Alembic将在其中存储其配置和迁移文件,需要一起提交 git
在这里插入图片描述

在 env.py 中导入元数据

from web_python_dev.sqlalchemy1.models import metadatatarget_metadata = metadata

编辑ini配置
在这里插入图片描述

开始迁移

alembic revision --autogenerate -m "Initial migration"

之后会生成一个py文件
在这里插入图片描述
该代码内有两个函数:upgradedowngrade用于数据迁移和回滚

# 升级
alembic upgrade head

数据的迁移和升级之前请做好备份和测试,防止丢失损坏
https://alembic.sqlalchemy.org/en/latest/index.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/471138.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Web框架——Flask系列之数据库迁移(二十)

一、Flask-Migrate扩展 在开发过程中,需要修改数据库模型,而且还要在修改之后更新数据库。最直接的方式就是删除旧表,但这样会丢失数据。 更好的解决办法是使用数据库迁移框架,它可以追踪数据库模式的变化,然后把变动…

LeetCode 2206. 将数组划分成相等数对

文章目录1. 题目2. 解题1. 题目 给你一个整数数组 nums ,它包含 2 * n 个整数。 你需要将 nums 划分成 n 个数对,满足: 每个元素 只属于一个 数对。同一数对中的元素 相等 。 如果可以将 nums 划分成 n 个数对,请你返回 true …

【iCore3 双核心板】例程三十五:HTTP_IAP_ARM实验——更新升级STM32

实验指导书及代码包下载: http://pan.baidu.com/s/1eRgzSPW iCore3 购买链接: https://item.taobao.com/item.htm?id524229438677 转载于:https://www.cnblogs.com/xiaomagee/p/5143326.html

Web框架——Flask系列之蓝图Blueprint(二十一)

一、为什么学习蓝图? 我们学习Flask框架,是从写单个文件,执行hello world开始的。我们在这单个文件中可以定义路由、视图函数、定义模型等等。但这显然存在一个问题:随着业务代码的增加,将所有代码都放在单个程序文件…

LeetCode 2207. 字符串中最多数目的子字符串(前缀和)

文章目录1. 题目2. 解题1. 题目 给你一个下标从 0 开始的字符串 text 和另一个下标从 0 开始且长度为 2 的字符串 pattern ,两者都只包含小写英文字母。 你可以在 text 中任意位置插入 一个 字符,这个插入的字符必须是 pattern[0] 或者 pattern[1] 。注…

指令系统——指令格式(详解)

一、总览 二、指令的定义 指令(又称机器指令): 是指示计算机执行某种操作的命令,是计算机运行的最小功能单位。 一台计算机的所有指令的集合构成该机的指令系统,也称为指令集。 注:一台计算机只能执行自己…

MySQL中Index Condition Pushdown(ICP)优化

在MySQL 5.6开始支持的一种根据索引进行查询的优化方式。之前的MySQL数据库版本不支持ICP,当进行索引查询是,首先根据索引来查找记录,然后在根据WHERE条件来过滤记录。在支持ICP后,MySQL数据库会在取出索引的同时,判断…

LeetCode 2208. 将数组和减半的最少操作次数(优先队列)

文章目录1. 题目2. 解题1. 题目 给你一个正整数数组 nums 。每一次操作中,你可以从 nums 中选择 任意 一个数并将它减小到 恰好 一半。(注意,在后续操作中你可以对减半过的数继续执行操作) 请你返回将 nums 数组和 至少 减少一半…

指令系统——数据存放、指令寻址(详解)

一、总览 二、数据存放 三、指令寻址 四、小结:

数据库和ORMS:使用Tortoise ORM与数据库通信

文章目录1. 安装环境2. 创建数据库模型3. 设置 Tortoise 引擎4. create5. 查询6. 修改、删除7. 添加关联8. 用Aerich建立数据库迁移系统learn from 《Building Data Science Applications with FastAPI》Tortoise ORM 是一种现代异步 ORM,非常适合 FastAPI项目 1. …

指令系统——数据寻址(1)(详解)

一、总览 二、操作数类型与寻址方式 三、数据寻址 四、立即寻址 立即寻址:形式地址A就是操作数本身,又称为立即数,一般采用补码形式。#表示立即寻址特征。 一条指令的执行:取指令访存1次,执行指令访存0次,暂…

使用 docker 搭建开发环境

作为一个 freelancer,经常能够接到很多的开发工作,这些金主,有喜欢 PHP 的,有习惯撒手不管的;有偏好 sqlite 的,也有喜欢 PG 的,我甚至见过 mysql、PG 一起使用的项目;同一门语言下&…

数据库和ORMS:使用 Motor 跟 MongoDB 通信

文章目录1. 安装2. 创建models3. 连接数据库4. 插入文档5. 查询6. 更新、删除7. 嵌套文档learn from 《Building Data Science Applications with FastAPI》面向文档的数据库(如MongoDB)不需要预先配置模式 Motor,这是一个用于与 MongoDB 异…

指令系统——数据寻址(2)(详解)

一、总览 二、偏移寻址 基址寻址:将CPU中基址寄存器(BR)的内容加上指令格式中的形式地址A,而形成操作数的有效地址,即EA(BR)A。 注:基址寄存器是面向操作系统的,其内容由…

十五、MySQL变量(系统变量、自定义变量)相关知识总结

变量: 系统变量: a.全局变量 b.会话变量 自定义变量: a.用户变量 b.局部变量 一、系统变量 说明:变量由系统定义,不是用户定义,属于服务器层面 注意:全局变量需要添加global关键字,会话变量…

LeetCode 2210. 统计数组中峰和谷的数量

文章目录1. 题目2. 解题1. 题目 给你一个下标从 0 开始的整数数组 nums 。如果两侧距 i 最近的不相等邻居的值均小于 nums[i] ,则下标 i 是 nums 中,某个峰的一部分。 类似地,如果两侧距 i 最近的不相等邻居的值均大于 nums[i] ,…

【项目总结】如何获取地图上的所有POI

1. 地图POI数据是什么,有什么用 关于地图数据123,可以参考一下这篇 https://www.zhihu.com/question/21530085/answer/18728706,回答了很多信息;下图是摘自其中,展示了建立一个地图需要的几个图层数据,从底…

十六、MySQL流程控制结构(顺序、分支、循环)详解 强化练习

流程控制结构:顺序、分支、循环 一、分支结构 case结构作为表达式: case结构作为独立的语句: if函数 语法:if(条件,值1,值2) 功能:实现双分支 应用在begin end中或外面 case结构 语法: 情…

LeetCode 2211. 统计道路上的碰撞次数

文章目录1. 题目2. 解题1. 题目 在一条无限长的公路上有 n 辆汽车正在行驶。汽车按从左到右的顺序按从 0 到 n - 1 编号,每辆车都在一个 独特的 位置。 给你一个下标从 0 开始的字符串 directions ,长度为 n 。 directions[i] 可以是 ‘L’、‘R’ 或 …

服务端的思考

概述 我们思考发布一个web服务需要做的工作,并进行职责的划分。职责的划分 通常我们会先制定抽象的接口,然后根据接口构造实现类。接口和实现类都完成了,再实施发布。所以,服务端的角色可以划分为:服务接口、服务实现、…