FastAPI 数据库配置最佳实践
1. 基础配置
1.1 数据库连接配置
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.pool import QueuePool
from sqlalchemy.exc import SQLAlchemyError
import logging# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)# 数据库URL配置
DATABASE_URL = "mysql+pymysql://user:password@localhost:3306/dbname"# 连接池配置
POOL_SIZE = 5 # 默认连接池大小
MAX_OVERFLOW = 10 # 最大溢出连接数
POOL_TIMEOUT = 30 # 连接池获取超时时间
POOL_RECYCLE = 1800 # 连接回收时间(秒)# 创建数据库引擎
engine = create_engine(DATABASE_URL,poolclass=QueuePool,pool_size=POOL_SIZE,max_overflow=MAX_OVERFLOW,pool_timeout=POOL_TIMEOUT,pool_recycle=POOL_RECYCLE,pool_pre_ping=True, # 自动检测失效连接echo=False # 是否打印SQL语句
)
1.2 会话管理
# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)# 声明基类
Base = declarative_base()
2. 连接池配置说明
2.1 参数解释
-
pool_size
: 连接池保持的连接数- 建议值: (CPU核心数 × 2) + 有效磁盘数
- 默认值: 5
- 说明: 不是越大越好,需要根据实际负载调整
-
max_overflow
: 超过 pool_size 后的最大允许连接数- 建议值: pool_size 的 2 倍以内
- 作用: 处理突发流量
-
pool_timeout
: 从连接池获取连接的超时时间- 单位: 秒
- 建议值: 30-60秒
- 说明: 防止连接池耗尽时无限等待
-
pool_recycle
: 连接重用的最大时间- 单位: 秒
- 建议值: 1800秒(30分钟)
- 作用: 防止数据库断开空闲连接
2.2 连接池事件监听
@event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):logger.debug("New database connection established")@event.listens_for(engine, "checkout")
def checkout(dbapi_connection, connection_record, connection_proxy):logger.debug("Database connection checked out from pool")@event.listens_for(engine, "checkin")
def checkin(dbapi_connection, connection_record):logger.debug("Database connection returned to pool")
3. FastAPI 依赖注入
3.1 数据库会话依赖
from typing import Generator
from fastapi import Dependsdef get_db() -> Generator:"""FastAPI 依赖函数,用于获取数据库会话使用方法:@app.get("/users/")def read_users(db: Session = Depends(get_db)):..."""db = SessionLocal()try:yield dbdb.commit() # 如果没有异常发生,提交事务except SQLAlchemyError as e:logger.error(f"Database error occurred: {str(e)}")db.rollback() # 发生异常时回滚raise # 重新抛出异常,让 FastAPI 的异常处理器处理except Exception as e:logger.error(f"Unexpected error occurred: {str(e)}")db.rollback()raisefinally:db.close() # 确保关闭会话
3.2 使用示例
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Sessionapp = FastAPI()@app.get("/users/")
def read_users(db: Session = Depends(get_db)):users = db.query(User).all()return users@app.post("/users/")
def create_user(user: UserCreate, db: Session = Depends(get_db)):db_user = User(**user.dict())db.add(db_user)db.commit()db.refresh(db_user)return db_user
4. 健康检查
4.1 数据库连接检查
def verify_db_connection() -> bool:"""验证数据库连接是否正常可以在应用启动时或者健康检查时使用"""try:db = SessionLocal()db.execute("SELECT 1")return Trueexcept Exception as e:logger.error(f"Database connection verification failed: {str(e)}")return Falsefinally:db.close()@app.on_event("startup")
async def startup():"""应用启动时验证数据库连接"""if not verify_db_connection():raise Exception("Database connection failed")
5. 最佳实践建议
-
连接池配置
- 根据实际负载调整 pool_size
- 设置合理的 pool_recycle 防止连接失效
- 启用 pool_pre_ping 自动检测连接状态
-
事务管理
- 使用 context manager 管理事务
- 确保正确的异常处理和回滚
- 避免长事务
-
性能优化
- 合理使用 autoflush
- 适当设置 pool_timeout
- 监控连接池使用情况
-
安全性
- 使用环境变量管理数据库凭证
- 限制数据库用户权限
- 定期更新数据库密码
-
监控和日志
- 记录关键数据库操作
- 监控连接池状态
- 设置合适的日志级别
6. 常见问题处理
-
连接池耗尽
# 解决方案:调整连接池配置 engine = create_engine(DATABASE_URL,pool_size=10, # 增加连接池大小max_overflow=20, # 增加最大溢出连接数pool_timeout=60 # 增加超时时间 )
-
连接超时
# 解决方案:添加重试机制 from tenacity import retry, stop_after_attempt, wait_exponential@retry(stop=stop_after_attempt(3),wait=wait_exponential(multiplier=1, min=4, max=10) ) def get_db_with_retry():return get_db()
-
死锁处理
# 解决方案:设置事务超时 from sqlalchemy import event@event.listens_for(engine, "begin") def set_transaction_timeout(conn):conn.execute("SET innodb_lock_wait_timeout = 15")
7. 总结
良好的数据库配置对于 FastAPI 应用的性能和可靠性至关重要。通过合理配置连接池、正确处理事务、实施监控和日志记录,可以构建一个健壮的数据库访问层。要根据实际应用场景和负载情况,不断调整和优化配置参数。
8. 不使用依赖注入的数据库使用方式
8.1 使用上下文管理器
from contextlib import contextmanager@contextmanager
def get_db_context():"""使用上下文管理器获取数据库会话使用方式:with get_db_context() as db:result = db.query(User).all()"""db = SessionLocal()try:yield dbdb.commit()except Exception as e:logger.error(f"Database error: {str(e)}")db.rollback()raisefinally:db.close()# 使用示例
def get_user_by_id(user_id: int):with get_db_context() as db:return db.query(User).filter(User.id == user_id).first()def create_new_user(user_data: dict):with get_db_context() as db:user = User(**user_data)db.add(user)db.commit()db.refresh(user)return user
8.2 使用装饰器
from functools import wrapsdef with_db(func):"""数据库会话装饰器使用方式:@with_dbdef my_function(db, *args, **kwargs):result = db.query(User).all()"""@wraps(func)def wrapper(*args, **kwargs):db = SessionLocal()try:kwargs['db'] = dbresult = func(*args, **kwargs)db.commit()return resultexcept Exception as e:logger.error(f"Database error: {str(e)}")db.rollback()raisefinally:db.close()return wrapper# 使用示例
@with_db
def get_user_list(db, skip: int = 0, limit: int = 100):return db.query(User).offset(skip).limit(limit).all()@with_db
def update_user(db, user_id: int, user_data: dict):user = db.query(User).filter(User.id == user_id).first()for key, value in user_data.items():setattr(user, key, value)db.commit()return user
8.3 直接使用会话
def get_db_session():"""获取数据库会话的简单函数警告:使用此方法时需要手动管理会话的生命周期"""return SessionLocal()# 使用示例
def perform_complex_operation():db = get_db_session()try:# 执行多个数据库操作result1 = db.query(User).all()result2 = db.query(Order).all()# 一些业务逻辑处理for user in result1:user.last_login = datetime.now()# 批量更新db.bulk_save_objects(result1)db.commit()return {"users": result1, "orders": result2}except Exception as e:logger.error(f"Error in complex operation: {str(e)}")db.rollback()raisefinally:db.close()
8.4 使用建议
-
选择合适的方式
- 简单操作:使用上下文管理器(
with get_db_context()
) - 重复性操作:使用装饰器(
@with_db
) - 复杂事务:直接使用会话
- 简单操作:使用上下文管理器(
-
注意事项
- 确保在所有情况下都正确关闭数据库会话
- 正确处理事务(提交/回滚)
- 适当的错误处理和日志记录
-
性能考虑
- 避免频繁创建和关闭会话
- 合理使用批量操作
- 注意事务范围的控制
-
最佳实践
- 在同一个事务中尽量减少数据库操作次数
- 使用适当的索引优化查询性能
- 定期监控数据库连接使用情况
7. 总结
良好的数据库配置对于 FastAPI 应用的性能和可靠性至关重要。通过合理配置连接池、正确处理事务、实施监控和日志记录,可以构建一个健壮的数据库访问层。要根据实际应用场景和负载情况,不断调整和优化配置参数。无论是使用依赖注入还是直接使用数据库会话,都要确保正确管理数据库资源,处理异常情况,并遵循最佳实践。
9. Galera Cluster 高级配置
9.1 连接配置
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker
import random# Galera Cluster 节点配置
GALERA_NODES = ["mysql+pymysql://user:pass@node1:3306/dbname","mysql+pymysql://user:pass@node2:3306/dbname","mysql+pymysql://user:pass@node3:3306/dbname"
]def get_galera_engine(mode='random'):"""获取 Galera Cluster 数据库引擎mode: - random: 随机选择节点(适合读操作)- primary: 使用主节点(适合写操作)- all: 返回所有节点的引擎(特殊场景使用)"""if mode == 'random':# 随机选择一个节点用于读操作db_url = random.choice(GALERA_NODES)elif mode == 'primary':# 使用主节点用于写操作db_url = GALERA_NODES[0]else:# 返回所有节点的引擎return [create_engine(url, **DB_KWARGS) for url in GALERA_NODES]return create_engine(db_url, **DB_KWARGS)# 针对 Galera 的连接池配置
DB_KWARGS = {"pool_size": 5,"max_overflow": 10,"pool_timeout": 30,"pool_recycle": 1800,"pool_pre_ping": True,# Galera 特定配置"connect_args": {"connect_timeout": 10,"read_timeout": 30,"write_timeout": 30,"charset": "utf8mb4",# 设置事务隔离级别"init_command": "SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED"}
}
9.2 读写分离实现
class GaleraSessionManager:"""Galera Cluster 会话管理器,实现读写分离"""def __init__(self):self.write_engine = get_galera_engine(mode='primary')self.read_engines = [get_galera_engine(mode='random') for _ in range(2)]self.WriteSessionLocal = sessionmaker(bind=self.write_engine)self.ReadSessionLocal = sessionmaker(bind=random.choice(self.read_engines))@contextmanagerdef get_read_db(self):"""获取读会话"""db = self.ReadSessionLocal()try:yield dbfinally:db.close()@contextmanagerdef get_write_db(self):"""获取写会话"""db = self.WriteSessionLocal()try:yield dbdb.commit()except Exception:db.rollback()raisefinally:db.close()# 使用示例
galera_manager = GaleraSessionManager()def read_user_profile(user_id: int):with galera_manager.get_read_db() as db:return db.query(User).filter(User.id == user_id).first()def update_user_profile(user_id: int, data: dict):with galera_manager.get_write_db() as db:user = db.query(User).filter(User.id == user_id).first()for key, value in data.items():setattr(user, key, value)
9.3 事务和一致性处理
from contextlib import contextmanager
from sqlalchemy.orm import Session
import timeclass GaleraTransaction:"""Galera 集群事务管理器"""MAX_RETRIES = 3RETRY_DELAY = 1 # 秒@staticmethod@contextmanagerdef atomic(db: Session):"""原子事务处理器,处理 Galera 特有的死锁和冲突"""retry_count = 0while True:try:yieldbreakexcept Exception as e:if retry_count >= GaleraTransaction.MAX_RETRIES:raiseif "deadlock" in str(e).lower() or "lock wait timeout" in str(e).lower():retry_count += 1time.sleep(GaleraTransaction.RETRY_DELAY)continueraise# 使用示例
def process_order(order_id: int):with galera_manager.get_write_db() as db:with GaleraTransaction.atomic(db):# 更新订单状态order = db.query(Order).filter(Order.id == order_id).first()order.status = 'processing'# 更新库存inventory = db.query(Inventory).filter(Inventory.product_id == order.product_id).first()inventory.quantity -= order.quantity
9.4 高可用性配置
class GaleraHighAvailability:"""Galera 集群高可用性管理"""def __init__(self):self.nodes = GALERA_NODESself.active_nodes = set()self.check_interval = 30 # 秒async def monitor_nodes(self):"""监控节点状态"""while True:for node in self.nodes:try:engine = create_engine(node, **DB_KWARGS)with engine.connect() as conn:# 检查节点状态result = conn.execute("SHOW STATUS LIKE 'wsrep_local_state_comment'")status = result.fetchone()[1]if status == 'Synced':self.active_nodes.add(node)else:self.active_nodes.discard(node)except Exception:self.active_nodes.discard(node)await asyncio.sleep(self.check_interval)def get_available_node(self):"""获取可用节点"""if not self.active_nodes:raise Exception("No available Galera nodes")return random.choice(list(self.active_nodes))# 使用示例
ha_manager = GaleraHighAvailability()@app.on_event("startup")
async def startup_event():# 启动节点监控asyncio.create_task(ha_manager.monitor_nodes())def get_db_connection():"""获取高可用数据库连接"""node = ha_manager.get_available_node()engine = create_engine(node, **DB_KWARGS)return sessionmaker(bind=engine)()
9.5 性能优化建议
-
读写分离策略
- 读操作使用随机节点
- 写操作固定使用主节点
- 批量操作考虑负载均衡
-
事务处理
- 使用 READ COMMITTED 隔离级别
- 实现死锁重试机制
- 控制事务大小和持续时间
-
连接池优化
- 为不同节点维护独立的连接池
- 根据节点角色调整连接池大小
- 定期检查并清理空闲连接
-
监控和维护
- 实时监控节点状态
- 记录同步延迟
- 实现故障自动转移
-
查询优化
- 使用合适的索引
- 避免长事务
- 控制并发写入操作
7. 总结
良好的数据库配置对于 FastAPI 应用的性能和可靠性至关重要。通过合理配置连接池、正确处理事务、实施监控和日志记录,可以构建一个健壮的数据库访问层。要根据实际应用场景和负载情况,不断调整和优化配置参数。无论是使用依赖注入还是直接使用数据库会话,都要确保正确管理数据库资源,处理异常情况,并遵循最佳实践。
10. 完整的 CRUD 示例
10.1 模型定义
from sqlalchemy import Column, Integer, String, DateTime, Boolean
from sqlalchemy.sql import func
from pydantic import BaseModel, EmailStr
from typing import Optional# SQLAlchemy 模型
class User(Base):__tablename__ = "users"id = Column(Integer, primary_key=True, index=True)email = Column(String(255), unique=True, index=True, nullable=False)username = Column(String(50), unique=True, index=True)hashed_password = Column(String(255), nullable=False)is_active = Column(Boolean, default=True)created_at = Column(DateTime(timezone=True), server_default=func.now())updated_at = Column(DateTime(timezone=True), onupdate=func.now())# Pydantic 模型
class UserBase(BaseModel):email: EmailStrusername: strclass UserCreate(UserBase):password: strclass UserUpdate(BaseModel):email: Optional[EmailStr] = Noneusername: Optional[str] = Noneis_active: Optional[bool] = Noneclass UserResponse(UserBase):id: intis_active: boolcreated_at: datetimeupdated_at: Optional[datetime]class Config:orm_mode = True
10.2 服务层实现
from passlib.context import CryptContext
from typing import List, Optionalpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")class UserService:def __init__(self, galera_manager: GaleraSessionManager):self.db_manager = galera_managerdef get_user(self, user_id: int) -> Optional[User]:"""读取单个用户"""with self.db_manager.get_read_db() as db:return db.query(User).filter(User.id == user_id).first()def get_user_by_email(self, email: str) -> Optional[User]:"""通过邮箱读取用户"""with self.db_manager.get_read_db() as db:return db.query(User).filter(User.email == email).first()def get_users(self, skip: int = 0, limit: int = 100) -> List[User]:"""读取用户列表"""with self.db_manager.get_read_db() as db:return db.query(User).offset(skip).limit(limit).all()def create_user(self, user: UserCreate) -> User:"""创建新用户"""with self.db_manager.get_write_db() as db:with GaleraTransaction.atomic(db):# 检查邮箱是否已存在if self.get_user_by_email(user.email):raise ValueError("Email already registered")# 创建用户hashed_password = pwd_context.hash(user.password)db_user = User(email=user.email,username=user.username,hashed_password=hashed_password)db.add(db_user)db.flush() # 获取自增IDdb.refresh(db_user)return db_userdef update_user(self, user_id: int, user_update: UserUpdate) -> Optional[User]:"""更新用户信息"""with self.db_manager.get_write_db() as db:with GaleraTransaction.atomic(db):db_user = db.query(User).filter(User.id == user_id).first()if not db_user:return None# 更新非空字段update_data = user_update.dict(exclude_unset=True)for key, value in update_data.items():setattr(db_user, key, value)db.flush()db.refresh(db_user)return db_userdef delete_user(self, user_id: int) -> bool:"""删除用户"""with self.db_manager.get_write_db() as db:with GaleraTransaction.atomic(db):db_user = db.query(User).filter(User.id == user_id).first()if not db_user:return Falsedb.delete(db_user)return True
10.3 API 路由实现
from fastapi import APIRouter, HTTPException, status, Depends
from typing import Listrouter = APIRouter(prefix="/users", tags=["users"])# 初始化服务
galera_manager = GaleraSessionManager()
user_service = UserService(galera_manager)@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(user: UserCreate):try:return user_service.create_user(user)except ValueError as e:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,detail=str(e))except Exception as e:raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,detail="Failed to create user")@router.get("/{user_id}", response_model=UserResponse)
async def read_user(user_id: int):user = user_service.get_user(user_id)if user is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,detail="User not found")return user@router.get("/", response_model=List[UserResponse])
async def read_users(skip: int = 0, limit: int = 100):return user_service.get_users(skip=skip, limit=limit)@router.put("/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user_update: UserUpdate):user = user_service.update_user(user_id, user_update)if user is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,detail="User not found")return user@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int):success = user_service.delete_user(user_id)if not success:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,detail="User not found")
10.4 使用示例
# 1. 创建用户
curl -X POST "http://localhost:8000/users/" \-H "Content-Type: application/json" \-d '{"email": "user@example.com", "username": "testuser", "password": "secret123"}'# 2. 获取用户列表
curl "http://localhost:8000/users/?skip=0&limit=10"# 3. 获取单个用户
curl "http://localhost:8000/users/1"# 4. 更新用户
curl -X PUT "http://localhost:8000/users/1" \-H "Content-Type: application/json" \-d '{"username": "newusername"}'# 5. 删除用户
curl -X DELETE "http://localhost:8000/users/1"
10.5 关键特性说明
-
读写分离
- 读操作(get_user, get_users)使用读节点
- 写操作(create_user, update_user, delete_user)使用写节点
-
事务处理
- 所有写操作都在 GaleraTransaction.atomic 中执行
- 自动处理死锁和重试
-
异常处理
- 业务逻辑异常(如邮箱已存在)
- 数据库异常
- HTTP 响应异常
-
性能优化
- 使用索引(email, username)
- 分页查询
- 选择性更新字段
-
数据验证
- 使用 Pydantic 模型验证输入数据
- 明确的响应模型定义
7. 总结
良好的数据库配置对于 FastAPI 应用的性能和可靠性至关重要。通过合理配置连接池、正确处理事务、实施监控和日志记录,可以构建一个健壮的数据库访问层。要根据实际应用场景和负载情况,不断调整和优化配置参数。无论是使用依赖注入还是直接使用数据库会话,都要确保正确管理数据库资源,处理异常情况,并遵循最佳实践。
11. 高并发和巨量数据处理
11.1 高并发优化
11.1.1 连接池配置
from sqlalchemy import create_engine
import multiprocessing# 计算最优连接池大小
CPU_COUNT = multiprocessing.cpu_count()
POOL_SIZE = CPU_COUNT * 2 + 1 # 基础连接数
MAX_OVERFLOW = POOL_SIZE * 2 # 最大溢出连接数# 高并发场景的连接池配置
engine = create_engine(DATABASE_URL,poolclass=QueuePool,pool_size=POOL_SIZE,max_overflow=MAX_OVERFLOW,pool_timeout=30,pool_recycle=1800,pool_pre_ping=True,# 设置 echo_pool 来监控连接池状态echo_pool=True
)# 连接池监控
@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_connection, connection_record, connection_proxy):logger.info(f"Connection pool size: {engine.pool.size()}")logger.info(f"Current checked out connections: {engine.pool.checkedin()}")
11.1.2 异步查询处理
from fastapi import FastAPI, BackgroundTasks
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.future import select# 异步数据库引擎
async_engine = create_async_engine("mysql+asyncmy://user:pass@localhost/db",pool_size=20,max_overflow=30
)async def get_async_session():async with AsyncSession(async_engine) as session:yield session# 异步查询示例
async def async_get_users(skip: int = 0, limit: int = 100):async with AsyncSession(async_engine) as session:query = select(User).offset(skip).limit(limit)result = await session.execute(query)return result.scalars().all()# API 实现
@router.get("/users/async")
async def get_users_async(background_tasks: BackgroundTasks,skip: int = 0, limit: int = 100
):# 将耗时操作放入后台任务background_tasks.add_task(process_user_data)return await async_get_users(skip, limit)
11.1.3 并发控制
from fastapi import HTTPException
from asyncio import Semaphore
import asyncio# 限制并发请求数
MAX_CONCURRENT_REQUESTS = 100
semaphore = Semaphore(MAX_CONCURRENT_REQUESTS)async def controlled_operation():async with semaphore:# 执行数据库操作pass# 使用 Redis 实现分布式锁
from redis import Redis
from redis.lock import Lockredis_client = Redis(host='localhost', port=6379, db=0)def with_distributed_lock(lock_name: str, timeout: int = 10):def decorator(func):async def wrapper(*args, **kwargs):lock = Lock(redis_client, lock_name, timeout=timeout)if not lock.acquire(blocking=True, blocking_timeout=5):raise HTTPException(status_code=423,detail="Resource is locked")try:return await func(*args, **kwargs)finally:lock.release()return wrapperreturn decorator# 使用示例
@router.post("/users/batch")
@with_distributed_lock("create_users_lock")
async def create_users_batch(users: List[UserCreate]):return await batch_create_users(users)
11.2 巨量数据处理
11.2.1 分批处理
from typing import List, Generator
import pandas as pdclass BatchProcessor:def __init__(self, batch_size: int = 1000):self.batch_size = batch_sizeself.galera_manager = GaleraSessionManager()def process_large_dataset(self, data: List[dict]):"""分批处理大数据集"""for batch in self._get_batches(data):with self.galera_manager.get_write_db() as db:with GaleraTransaction.atomic(db):self._process_batch(db, batch)def _get_batches(self, data: List[dict]) -> Generator[List[dict], None, None]:"""将数据集分割成小批次"""for i in range(0, len(data), self.batch_size):yield data[i:i + self.batch_size]def _process_batch(self, db: Session, batch: List[dict]):"""处理单个批次的数据"""try:# 使用 bulk_insert_mappings 进行批量插入db.bulk_insert_mappings(User, batch)db.flush()except Exception as e:logger.error(f"Error processing batch: {str(e)}")raise# 使用示例
@router.post("/users/bulk")
async def bulk_create_users(background_tasks: BackgroundTasks,users: List[UserCreate]
):processor = BatchProcessor(batch_size=1000)# 将大量数据处理放入后台任务background_tasks.add_task(processor.process_large_dataset, users)return {"message": "Bulk processing started"}
11.2.2 流式处理
from fastapi.responses import StreamingResponse
import csv
import ioclass StreamProcessor:CHUNK_SIZE = 1000@staticmethodasync def stream_large_query(query):"""流式处理大型查询结果"""with SessionLocal() as db:result_proxy = db.execute(query)while True:chunk = result_proxy.fetchmany(StreamProcessor.CHUNK_SIZE)if not chunk:breakyield chunk@staticmethodasync def export_large_dataset():"""导出大型数据集"""output = io.StringIO()writer = csv.writer(output)# 写入表头writer.writerow(["id", "email", "username", "created_at"])query = "SELECT id, email, username, created_at FROM users"async for chunk in StreamProcessor.stream_large_query(query):for row in chunk:writer.writerow(row)output.seek(0)data = output.read()output.seek(0)output.truncate(0)yield data# API 实现
@router.get("/users/export")
async def export_users():return StreamingResponse(StreamProcessor.export_large_dataset(),media_type="text/csv",headers={"Content-Disposition": "attachment; filename=users.csv"})
11.2.3 分区表处理
from sqlalchemy import Table, Column, Integer, String, DateTime
from sqlalchemy.schema import CreateTable
import datetimedef create_partition_table(year: int, month: int):"""创建分区表"""partition_name = f"users_{year}_{month:02d}"# 创建分区表partition = Table(partition_name,Base.metadata,Column('id', Integer, primary_key=True),Column('email', String(255)),Column('username', String(50)),Column('created_at', DateTime),postgresql_partition_by='RANGE (created_at)',)# 设置分区范围start_date = datetime.datetime(year, month, 1)if month == 12:end_date = datetime.datetime(year + 1, 1, 1)else:end_date = datetime.datetime(year, month + 1, 1)partition_sql = f"""CREATE TABLE {partition_name}PARTITION OF usersFOR VALUES FROM ('{start_date}') TO ('{end_date}')"""return partition_sqlclass PartitionManager:def __init__(self):self.engine = create_engine(DATABASE_URL)def ensure_partition_exists(self, date: datetime.datetime):"""确保指定日期的分区存在"""year = date.yearmonth = date.monthpartition_name = f"users_{year}_{month:02d}"# 检查分区是否存在exists_query = f"""SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '{partition_name}')"""with self.engine.connect() as conn:exists = conn.execute(exists_query).scalar()if not exists:# 创建新分区partition_sql = create_partition_table(year, month)conn.execute(partition_sql)# 使用示例
@router.post("/users/partitioned")
async def create_user_partitioned(user: UserCreate):partition_manager = PartitionManager()# 确保当前月份的分区存在partition_manager.ensure_partition_exists(datetime.datetime.now())# 创建用户with galera_manager.get_write_db() as db:with GaleraTransaction.atomic(db):db_user = User(**user.dict())db.add(db_user)return db_user
11.2.4 查询优化
from sqlalchemy import text
from sqlalchemy.orm import joinedload, selectinloadclass QueryOptimizer:def __init__(self, db: Session):self.db = dbdef optimize_large_table_query(self):"""优化大表查询"""# 1. 使用索引提示query = text("""SELECT /*+ INDEX(users email_idx) */FROM usersWHERE email LIKE :pattern""")# 2. 使用延迟加载query = self.db.query(User).options(selectinload(User.orders))# 3. 使用覆盖索引query = self.db.query(User.id, User.email, User.username).filter(User.is_active == True)# 4. 分页优化query = self.db.query(User).filter(User.id > last_id).order_by(User.id).limit(100)return querydef analyze_query(self, query):"""分析查询性能"""explain_sql = f"EXPLAIN ANALYZE {query}"return self.db.execute(explain_sql)# 使用示例
@router.get("/users/optimized")
async def get_users_optimized(db: Session = Depends(get_db),last_id: int = 0,limit: int = 100
):optimizer = QueryOptimizer(db)query = optimizer.optimize_large_table_query()return query.all()
11.3 性能监控
import time
from functools import wraps
from prometheus_client import Counter, Histogram# Prometheus 指标
DB_OPERATION_DURATION = Histogram('db_operation_duration_seconds','Database operation duration',['operation_type']
)
DB_OPERATION_ERRORS = Counter('db_operation_errors_total','Database operation errors',['operation_type']
)def monitor_db_operation(operation_type: str):def decorator(func):@wraps(func)async def wrapper(*args, **kwargs):start_time = time.time()try:result = await func(*args, **kwargs)duration = time.time() - start_timeDB_OPERATION_DURATION.labels(operation_type=operation_type).observe(duration)return resultexcept Exception as e:DB_OPERATION_ERRORS.labels(operation_type=operation_type).inc()raisereturn wrapperreturn decorator# 使用示例
@router.get("/users/monitored")
@monitor_db_operation("get_users")
async def get_users_monitored(skip: int = 0, limit: int = 100):return await async_get_users(skip, limit)
11.4 最佳实践建议
-
高并发处理
- 使用适当的连接池大小
- 实现请求限流
- 使用异步操作
- 实现分布式锁
-
大数据处理
- 实现批量处理
- 使用流式处理
- 实现数据分区
- 优化查询性能
-
性能监控
- 监控数据库操作
- 记录性能指标
- 实现告警机制
-
扩展性考虑
- 实现水平扩展
- 使用读写分离
- 实现缓存策略
7. 总结
良好的数据库配置对于 FastAPI 应用的性能和可靠性至关重要。通过合理配置连接池、正确处理事务、实施监控和日志记录,可以构建一个健壮的数据库访问层。要根据实际应用场景和负载情况,不断调整和优化配置参数。无论是使用依赖注入还是直接使用数据库会话,都要确保正确管理数据库资源,处理异常情况,并遵循最佳实践。
12. 水平扩展、读写分离和缓存策略
12.1 水平扩展实现
from typing import List, Optional
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import randomclass DatabaseCluster:"""数据库集群管理器"""def __init__(self):# 主数据库配置self.master_db = {"url": "mysql+pymysql://user:pass@master:3306/db","engine": None,"session_maker": None}# 从数据库配置列表self.slave_dbs = [{"url": "mysql+pymysql://user:pass@slave1:3306/db","weight": 1, # 权重,用于负载均衡"engine": None,"session_maker": None},{"url": "mysql+pymysql://user:pass@slave2:3306/db","weight": 1,"engine": None,"session_maker": None}]self._initialize_connections()def _initialize_connections(self):"""初始化所有数据库连接"""# 初始化主库连接self.master_db["engine"] = create_engine(self.master_db["url"],pool_size=5,max_overflow=10,pool_timeout=30)self.master_db["session_maker"] = sessionmaker(bind=self.master_db["engine"])# 初始化从库连接for slave in self.slave_dbs:slave["engine"] = create_engine(slave["url"],pool_size=10,max_overflow=20,pool_timeout=30)slave["session_maker"] = sessionmaker(bind=slave["engine"])def get_master_session(self):"""获取主库会话"""return self.master_db["session_maker"]()def get_slave_session(self):"""获取从库会话(带权重的随机选择)"""total_weight = sum(slave["weight"] for slave in self.slave_dbs)r = random.uniform(0, total_weight)current_weight = 0for slave in self.slave_dbs:current_weight += slave["weight"]if r <= current_weight:return slave["session_maker"]()# 默认返回第一个从库return self.slave_dbs[0]["session_maker"]()# 使用示例
db_cluster = DatabaseCluster()class UserService:"""用户服务,演示读写分离"""@staticmethoddef get_user_by_id(user_id: int) -> Optional[User]:"""读操作:从从库读取"""session = db_cluster.get_slave_session()try:return session.query(User).filter(User.id == user_id).first()finally:session.close()@staticmethoddef create_user(user_data: dict) -> User:"""写操作:在主库写入"""session = db_cluster.get_master_session()try:user = User(**user_data)session.add(user)session.commit()return userfinally:session.close()
12.2 读写分离策略
from contextlib import contextmanager
from typing import Generator
from sqlalchemy.orm import Sessionclass ReadWriteManager:"""读写分离管理器"""def __init__(self):self.db_cluster = DatabaseCluster()@contextmanagerdef read_session(self) -> Generator[Session, None, None]:"""获取读会话(从从库)"""session = self.db_cluster.get_slave_session()try:yield sessionfinally:session.close()@contextmanagerdef write_session(self) -> Generator[Session, None, None]:"""获取写会话(从主库)"""session = self.db_cluster.get_master_session()try:yield sessionsession.commit()except Exception:session.rollback()raisefinally:session.close()# 服务层实现
class EnhancedUserService:def __init__(self):self.db_manager = ReadWriteManager()def get_users(self, skip: int = 0, limit: int = 100) -> List[User]:"""读操作示例"""with self.db_manager.read_session() as session:return session.query(User).offset(skip).limit(limit).all()def create_user(self, user_data: dict) -> User:"""写操作示例"""with self.db_manager.write_session() as session:user = User(**user_data)session.add(user)return userdef update_user(self, user_id: int, user_data: dict) -> Optional[User]:"""复杂操作示例:先读后写"""# 首先从从库读取with self.db_manager.read_session() as read_session:user = read_session.query(User).filter(User.id == user_id).first()if not user:return None# 然后在主库更新with self.db_manager.write_session() as write_session:user = write_session.query(User).filter(User.id == user_id).first()for key, value in user_data.items():setattr(user, key, value)return user
12.3 缓存策略实现
from functools import wraps
from typing import Any, Optional
import json
from redis import Redis
from datetime import timedeltaclass CacheManager:"""缓存管理器"""def __init__(self):self.redis_client = Redis(host='localhost',port=6379,db=0,decode_responses=True)self.default_ttl = timedelta(minutes=30)def get(self, key: str) -> Optional[str]:"""获取缓存"""return self.redis_client.get(key)def set(self, key: str, value: Any, ttl: Optional[timedelta] = None):"""设置缓存"""if ttl is None:ttl = self.default_ttlif isinstance(value, (dict, list)):value = json.dumps(value)self.redis_client.set(key, value, ex=int(ttl.total_seconds()))def delete(self, key: str):"""删除缓存"""self.redis_client.delete(key)def clear_pattern(self, pattern: str):"""清除匹配模式的所有缓存"""keys = self.redis_client.keys(pattern)if keys:self.redis_client.delete(*keys)# 缓存装饰器
def cache_result(prefix: str,ttl: Optional[timedelta] = None,key_func = None
):"""缓存装饰器:param prefix: 缓存键前缀:param ttl: 缓存过期时间:param key_func: 自定义缓存键生成函数"""cache_manager = CacheManager()def decorator(func):@wraps(func)async def wrapper(*args, **kwargs):# 生成缓存键if key_func:cache_key = f"{prefix}:{key_func(*args, **kwargs)}"else:# 默认使用参数作为缓存键key_parts = [str(arg) for arg in args]key_parts.extend(f"{k}:{v}" for k, v in sorted(kwargs.items()))cache_key = f"{prefix}:{':'.join(key_parts)}"# 尝试从缓存获取cached_result = cache_manager.get(cache_key)if cached_result:return json.loads(cached_result)# 执行原函数result = await func(*args, **kwargs)# 存入缓存cache_manager.set(cache_key, result, ttl)return resultreturn wrapperreturn decorator# 使用示例
class CachedUserService:def __init__(self):self.db_manager = ReadWriteManager()self.cache_manager = CacheManager()@cache_result(prefix="user", ttl=timedelta(minutes=10))async def get_user(self, user_id: int) -> Optional[dict]:"""带缓存的用户查询"""with self.db_manager.read_session() as session:user = session.query(User).filter(User.id == user_id).first()if user:return user.to_dict()return Noneasync def update_user(self, user_id: int, user_data: dict) -> Optional[dict]:"""更新用户并清除缓存"""with self.db_manager.write_session() as session:user = session.query(User).filter(User.id == user_id).first()if not user:return Nonefor key, value in user_data.items():setattr(user, key, value)# 清除相关缓存self.cache_manager.delete(f"user:{user_id}")# 清除可能包含该用户的列表缓存self.cache_manager.clear_pattern("user_list:*")return user.to_dict()@cache_result(prefix="user_list", ttl=timedelta(minutes=5))async def get_user_list(self,skip: int = 0,limit: int = 100,status: Optional[str] = None) -> List[dict]:"""带缓存的用户列表查询"""with self.db_manager.read_session() as session:query = session.query(User)if status:query = query.filter(User.status == status)users = query.offset(skip).limit(limit).all()return [user.to_dict() for user in users]# FastAPI 路由实现
@router.get("/users/{user_id}")
async def get_user(user_id: int):service = CachedUserService()user = await service.get_user(user_id)if not user:raise HTTPException(status_code=404, detail="User not found")return user@router.put("/users/{user_id}")
async def update_user(user_id: int, user_update: UserUpdate):service = CachedUserService()user = await service.update_user(user_id, user_update.dict())if not user:raise HTTPException(status_code=404, detail="User not found")return user@router.get("/users/")
async def get_users(skip: int = 0,limit: int = 100,status: Optional[str] = None
):service = CachedUserService()return await service.get_user_list(skip, limit, status)
12.4 最佳实践建议
-
水平扩展注意事项
- 合理配置主从数据库的连接池大小
- 实现故障转移机制
- 监控数据库负载和性能
- 定期检查主从同步状态
-
读写分离策略
- 明确区分读写操作
- 处理读写延迟问题
- 实现事务一致性
- 合理分配读写流量
-
缓存策略
- 选择合适的缓存粒度
- 设置合理的过期时间
- 及时清除相关缓存
- 处理缓存击穿和雪崩问题
-
性能优化
- 使用连接池
- 实现请求队列
- 添加监控和告警
- 定期优化数据库
-
可用性保障
- 实现健康检查
- 添加重试机制
- 做好容错处理
- 实现优雅降级
12.5 使用建议
-
水平扩展场景
- 读多写少的业务
- 高并发查询场景
- 需要数据备份的场景
- 地理分布式部署
-
读写分离使用场景
- 读写比例失衡
- 需要提高读取性能
- 数据一致性要求不高
- 复杂查询较多
-
缓存策略使用场景
- 热点数据访问
- 计算密集型查询
- 接口响应优化
- 减少数据库压力
-
注意事项
- 定期检查系统性能
- 监控资源使用情况
- 做好数据备份
- 制定应急预案