文章目录
- 1. 安装
- 2. 创建models
- 3. 连接数据库
- 4. 插入文档
- 5. 查询
- 6. 更新、删除
- 7. 嵌套文档
learn from 《Building Data Science Applications with FastAPI》
面向文档的数据库(如MongoDB)不需要预先配置模式
Motor,这是一个用于与 MongoDB 异步通信的库,由MongoDB组织官方支持
1. 安装
pip install motor
Successfully installed motor-2.5.1 pymongo-3.12.3
2. 创建models
- MongoDB 会为每个文件创建
_id
属性作为唯一标识符,但是_
开头的变量被 Pydantic 认为是私有的,不会作为数据字段 _id
是二进制对象,不被 Pydantic 支持
# _*_ coding: utf-8 _*_
# @Time : 2022/3/23 14:37
# @Author : Michael
# @File : models.py
# @desc :from datetime import datetime
from typing import Optional
from bson import ObjectId # A MongoDB ObjectId
from pydantic import BaseModel, Fieldclass PyObjectId(ObjectId):@classmethoddef __get_validators__(cls):yield cls.validate@classmethoddef validate(cls, v):if not ObjectId.is_valid(v):raise ValueError("Invalid objectid")return ObjectId(v)@classmethoddef __modify_schema__(cls, field_schema):field_schema.update(type="string")class MongoBaseModel(BaseModel):# PyObjectId 类的作用是 使得 ObjectId 成为 Pydantic 兼容的类型id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")# alias 是一个 pydantic选项,在调用 dict 方法时,会转换为 _id 名,这是MongoDB需要的class Config:json_encoders = {ObjectId: str}# json序列化时,采用的映射方法,ObjectId自己实现了__str__,可以被映射为 strclass PostBase(MongoBaseModel):title: strcontent: strpublication_date: datetime = Field(default_factory=datetime.now)class PostPartialUpdate(BaseModel):title: Optional[str] = Nonecontent: Optional[str] = Noneclass PostCreate(PostBase):passclass PostDB(PostBase):pass
3. 连接数据库
https://docs.mongodb.com/manual/
reference/connection-string/
# _*_ coding: utf-8 _*_
# @Time : 2022/3/23 14:37
# @Author : Michael
# @File : app.py.py
# @desc :from typing import List, Tuple
from bson import ObjectId, errors # BSON (Binary JSON) encoding and decoding
from fastapi import Depends, FastAPI, HTTPException, Query, status
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabasefrom web_python_dev.mongo_motor.models import PostDB, PostCreate, PostPartialUpdateapp = FastAPI()
motor_client = AsyncIOMotorClient("mongodb://localhost:27017"
)
database = motor_client["cp6_mongodb"] # 单个数据库实例def get_database() -> AsyncIOMotorDatabase:return database
4. 插入文档
async def pagination(skip: int = Query(0, ge=0),limit: int = Query(10, ge=0)) -> Tuple[int, int]:capped_limit = min(100, limit)return (skip, capped_limit)async def get_object_id(id: str) -> ObjectId:try:return ObjectId(id)except (errors.InvalidId, TypeError):raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)async def get_post_or_404(id: ObjectId = Depends(get_object_id),database: AsyncIOMotorDatabase = Depends(get_database)
) -> PostDB:raw_post = await database['posts'].find_one({"_id": id})if raw_post is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)return PostDB(**raw_post)
@app.post("/posts", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_post(post: PostCreate, database: AsyncIOMotorDatabase = Depends(get_database)
) -> PostDB:post_db = PostDB(**post.dict())await database["posts"].insert_one(post_db.dict(by_alias=True))# by_alias=True 使用 _id 来序列化post_db = await get_post_or_404(post_db.id, database)return post_db
测试之前需要 docker 开启服务
docker run -d --name fastapi-mongo -p 27017:27017 mongo:4.4
5. 查询
@app.get("/posts")
async def list_posts(pagination: Tuple[int, int] = Depends(pagination),database: AsyncIOMotorDatabase = Depends(get_database)
) -> List[PostDB]:skip, limit = paginationquery = database['posts'].find({}, skip=skip, limit=limit)# find 第一个参数 是过滤用的,我们要获取所有的,所以留空result = [PostDB(**raw_post) async for raw_post in query]# async 列表推导return result
@app.get("/posts/{id}", response_model=PostDB)
async def get_post(post: PostDB = Depends(get_post_or_404)) -> PostDB:return post
6. 更新、删除
@app.patch("/posts/{id}", response_model=PostDB)
async def update_post(post_update: PostPartialUpdate,post: PostDB = Depends(get_post_or_404),database: AsyncIOMotorDatabase = Depends(get_database),
) -> PostDB:await database["posts"].update_one({"_id": post.id}, {"$set": post_update.dict(exclude_unset=True)})post_db = await get_post_or_404(post.id, database)return post_db
@app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(post: PostDB = Depends(get_post_or_404),database: AsyncIOMotorDatabase = Depends(get_database),
):await database["posts"].delete_one({"_id": post.id})
7. 嵌套文档
如果我们想将 post 和 comment 一起存储
在 models.py 中添加
class CommentBase(BaseModel):publication_date: datetime = Field(default_factory=datetime.now)content: strclass CommentCreate(CommentBase):passclass CommentDB(CommentBase):passclass PostDB(PostBase):comments: List[CommentDB] = Field(default_factory=list)
现在我们在查询一下看看,发现 comments 也在结果当中
那我们要创建 comments 的内容
@app.post("/posts/{id}/comments", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_comment(comment:CommentCreate,post:PostDB=Depends(get_post_or_404),database:AsyncIOMotorDatabase=Depends(get_database))->PostDB:await database['posts'].update_one({'_id': post.id}, {'$push': {'comments': comment.dict()}})# $push操作。这是向列表属性添加元素的有用运算符post_db = await get_post_or_404(post.id, database)return post_db
更多update操作:https://www.mongodb.com/docs/manual/reference/operator/update/