一、环境准备:
需要有mongodb和poetry以及Python3.10+
二、克隆示例代码到本地
git clone git@github.com:waketzheng/fastapi_async_mongo_demo.git
三、安装依赖
cd fastapi_async_mongo_demo
poetry shell
poetry install
四、启动服务
python app/main.py
五、访问本地文档
http://localhost:8000/docs
六、代码详解:
目录如下:
.
├── app
│ ├── db.py
│ ├── main.py
│ ├── schemas
│ │ ├── __init__.py
│ │ ├── item.py
│ │ └── user.py
│ ├── utils.py
│ └── views.py
├── poetry.lock
├── pyproject.toml
└── README.md
1. 数据库连接(app/db.py)
from pathlib import Path
from typing import TYPE_CHECKINGimport motor.motor_asyncioif TYPE_CHECKING:from motor.core import AgnosticClientclient: "AgnosticClient" = motor.motor_asyncio.AsyncIOMotorClient("localhost", 27017)
db_name = Path(__file__).parent.parent.name.replace("-", "_")
database = client[db_name]
github/fastapi_async_mongo_demo [main●] » vi app/db.py
github/fastapi_async_mongo_demo [main●] » cat app/db.py
from pathlib import Path
from typing import TYPE_CHECKINGimport motor.motor_asyncioif TYPE_CHECKING:from motor.core import AgnosticClientclient: "AgnosticClient" = motor.motor_asyncio.AsyncIOMotorClient("localhost", 27017)
db_name: str = Path(__file__).parent.parent.name.replace("-", "_")
database = client[db_name]
2. 启动服务器时,挂载数据库连接实例(app/main.py)
from contextlib import asynccontextmanager
from fastapi import FastAPI
from db import database@asynccontextmanager
async def lifespan(app: FastAPI):app.state.db = databaseyieldapp = FastAPI(lifespan=lifespan)
3. 根据用户传入的id,查询数据库,返回json格式的数据给用户
from bson.objectid import ObjectId
from fastapi import Request@app.get("/items/{id}")
async def item_data(request: Request, id: str) -> dict:collection = request.app.state.db['item']obj = await collection.find_one({"_id": ObjectId(id)})data = {'id': str(obj['_id']), 'name': obj['name']} return data
4. 增删改
# 增
data = {'name': 'xx', 'description': None}
inserted = await collection.insert_one(data)
object_id: str = str(inserted.inserted_id)# 改
data = {'name': 'new_name'}
await collection.update_one({"_id": ObjectId(id)}, {"$set": data})# 删
await collection.delete_one({"_id": ObjectId(id)})# 查询所有记录
objs = []
async for obj in self.collection.find():objs.append(obj)