FastAPI 数据库配置最佳实践

FastAPI 数据库配置最佳实践

1. 基础配置

1.1 数据库连接配置

from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.pool import QueuePool
from sqlalchemy.exc import SQLAlchemyError
import logging# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)# 数据库URL配置
DATABASE_URL = "mysql+pymysql://user:password@localhost:3306/dbname"# 连接池配置
POOL_SIZE = 5  # 默认连接池大小
MAX_OVERFLOW = 10  # 最大溢出连接数
POOL_TIMEOUT = 30  # 连接池获取超时时间
POOL_RECYCLE = 1800  # 连接回收时间(秒)# 创建数据库引擎
engine = create_engine(DATABASE_URL,poolclass=QueuePool,pool_size=POOL_SIZE,max_overflow=MAX_OVERFLOW,pool_timeout=POOL_TIMEOUT,pool_recycle=POOL_RECYCLE,pool_pre_ping=True,  # 自动检测失效连接echo=False  # 是否打印SQL语句
)

1.2 会话管理

# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)# 声明基类
Base = declarative_base()

2. 连接池配置说明

2.1 参数解释

  • pool_size: 连接池保持的连接数

    • 建议值: (CPU核心数 × 2) + 有效磁盘数
    • 默认值: 5
    • 说明: 不是越大越好,需要根据实际负载调整
  • max_overflow: 超过 pool_size 后的最大允许连接数

    • 建议值: pool_size 的 2 倍以内
    • 作用: 处理突发流量
  • pool_timeout: 从连接池获取连接的超时时间

    • 单位: 秒
    • 建议值: 30-60秒
    • 说明: 防止连接池耗尽时无限等待
  • pool_recycle: 连接重用的最大时间

    • 单位: 秒
    • 建议值: 1800秒(30分钟)
    • 作用: 防止数据库断开空闲连接

2.2 连接池事件监听

@event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):logger.debug("New database connection established")@event.listens_for(engine, "checkout")
def checkout(dbapi_connection, connection_record, connection_proxy):logger.debug("Database connection checked out from pool")@event.listens_for(engine, "checkin")
def checkin(dbapi_connection, connection_record):logger.debug("Database connection returned to pool")

3. FastAPI 依赖注入

3.1 数据库会话依赖

from typing import Generator
from fastapi import Dependsdef get_db() -> Generator:"""FastAPI 依赖函数,用于获取数据库会话使用方法:@app.get("/users/")def read_users(db: Session = Depends(get_db)):..."""db = SessionLocal()try:yield dbdb.commit()  # 如果没有异常发生,提交事务except SQLAlchemyError as e:logger.error(f"Database error occurred: {str(e)}")db.rollback()  # 发生异常时回滚raise  # 重新抛出异常,让 FastAPI 的异常处理器处理except Exception as e:logger.error(f"Unexpected error occurred: {str(e)}")db.rollback()raisefinally:db.close()  # 确保关闭会话

3.2 使用示例

from fastapi import FastAPI, Depends
from sqlalchemy.orm import Sessionapp = FastAPI()@app.get("/users/")
def read_users(db: Session = Depends(get_db)):users = db.query(User).all()return users@app.post("/users/")
def create_user(user: UserCreate, db: Session = Depends(get_db)):db_user = User(**user.dict())db.add(db_user)db.commit()db.refresh(db_user)return db_user

4. 健康检查

4.1 数据库连接检查

def verify_db_connection() -> bool:"""验证数据库连接是否正常可以在应用启动时或者健康检查时使用"""try:db = SessionLocal()db.execute("SELECT 1")return Trueexcept Exception as e:logger.error(f"Database connection verification failed: {str(e)}")return Falsefinally:db.close()@app.on_event("startup")
async def startup():"""应用启动时验证数据库连接"""if not verify_db_connection():raise Exception("Database connection failed")

5. 最佳实践建议

  1. 连接池配置

    • 根据实际负载调整 pool_size
    • 设置合理的 pool_recycle 防止连接失效
    • 启用 pool_pre_ping 自动检测连接状态
  2. 事务管理

    • 使用 context manager 管理事务
    • 确保正确的异常处理和回滚
    • 避免长事务
  3. 性能优化

    • 合理使用 autoflush
    • 适当设置 pool_timeout
    • 监控连接池使用情况
  4. 安全性

    • 使用环境变量管理数据库凭证
    • 限制数据库用户权限
    • 定期更新数据库密码
  5. 监控和日志

    • 记录关键数据库操作
    • 监控连接池状态
    • 设置合适的日志级别

6. 常见问题处理

  1. 连接池耗尽

    # 解决方案:调整连接池配置
    engine = create_engine(DATABASE_URL,pool_size=10,  # 增加连接池大小max_overflow=20,  # 增加最大溢出连接数pool_timeout=60  # 增加超时时间
    )
    
  2. 连接超时

    # 解决方案:添加重试机制
    from tenacity import retry, stop_after_attempt, wait_exponential@retry(stop=stop_after_attempt(3),wait=wait_exponential(multiplier=1, min=4, max=10)
    )
    def get_db_with_retry():return get_db()
    
  3. 死锁处理

    # 解决方案:设置事务超时
    from sqlalchemy import event@event.listens_for(engine, "begin")
    def set_transaction_timeout(conn):conn.execute("SET innodb_lock_wait_timeout = 15")
    

7. 总结

良好的数据库配置对于 FastAPI 应用的性能和可靠性至关重要。通过合理配置连接池、正确处理事务、实施监控和日志记录,可以构建一个健壮的数据库访问层。要根据实际应用场景和负载情况,不断调整和优化配置参数。

8. 不使用依赖注入的数据库使用方式

8.1 使用上下文管理器

from contextlib import contextmanager@contextmanager
def get_db_context():"""使用上下文管理器获取数据库会话使用方式:with get_db_context() as db:result = db.query(User).all()"""db = SessionLocal()try:yield dbdb.commit()except Exception as e:logger.error(f"Database error: {str(e)}")db.rollback()raisefinally:db.close()# 使用示例
def get_user_by_id(user_id: int):with get_db_context() as db:return db.query(User).filter(User.id == user_id).first()def create_new_user(user_data: dict):with get_db_context() as db:user = User(**user_data)db.add(user)db.commit()db.refresh(user)return user

8.2 使用装饰器

from functools import wrapsdef with_db(func):"""数据库会话装饰器使用方式:@with_dbdef my_function(db, *args, **kwargs):result = db.query(User).all()"""@wraps(func)def wrapper(*args, **kwargs):db = SessionLocal()try:kwargs['db'] = dbresult = func(*args, **kwargs)db.commit()return resultexcept Exception as e:logger.error(f"Database error: {str(e)}")db.rollback()raisefinally:db.close()return wrapper# 使用示例
@with_db
def get_user_list(db, skip: int = 0, limit: int = 100):return db.query(User).offset(skip).limit(limit).all()@with_db
def update_user(db, user_id: int, user_data: dict):user = db.query(User).filter(User.id == user_id).first()for key, value in user_data.items():setattr(user, key, value)db.commit()return user

8.3 直接使用会话

def get_db_session():"""获取数据库会话的简单函数警告:使用此方法时需要手动管理会话的生命周期"""return SessionLocal()# 使用示例
def perform_complex_operation():db = get_db_session()try:# 执行多个数据库操作result1 = db.query(User).all()result2 = db.query(Order).all()# 一些业务逻辑处理for user in result1:user.last_login = datetime.now()# 批量更新db.bulk_save_objects(result1)db.commit()return {"users": result1, "orders": result2}except Exception as e:logger.error(f"Error in complex operation: {str(e)}")db.rollback()raisefinally:db.close()

8.4 使用建议

  1. 选择合适的方式

    • 简单操作:使用上下文管理器(with get_db_context()
    • 重复性操作:使用装饰器(@with_db
    • 复杂事务:直接使用会话
  2. 注意事项

    • 确保在所有情况下都正确关闭数据库会话
    • 正确处理事务(提交/回滚)
    • 适当的错误处理和日志记录
  3. 性能考虑

    • 避免频繁创建和关闭会话
    • 合理使用批量操作
    • 注意事务范围的控制
  4. 最佳实践

    • 在同一个事务中尽量减少数据库操作次数
    • 使用适当的索引优化查询性能
    • 定期监控数据库连接使用情况

7. 总结

良好的数据库配置对于 FastAPI 应用的性能和可靠性至关重要。通过合理配置连接池、正确处理事务、实施监控和日志记录,可以构建一个健壮的数据库访问层。要根据实际应用场景和负载情况,不断调整和优化配置参数。无论是使用依赖注入还是直接使用数据库会话,都要确保正确管理数据库资源,处理异常情况,并遵循最佳实践。

9. Galera Cluster 高级配置

9.1 连接配置

from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker
import random# Galera Cluster 节点配置
GALERA_NODES = ["mysql+pymysql://user:pass@node1:3306/dbname","mysql+pymysql://user:pass@node2:3306/dbname","mysql+pymysql://user:pass@node3:3306/dbname"
]def get_galera_engine(mode='random'):"""获取 Galera Cluster 数据库引擎mode: - random: 随机选择节点(适合读操作)- primary: 使用主节点(适合写操作)- all: 返回所有节点的引擎(特殊场景使用)"""if mode == 'random':# 随机选择一个节点用于读操作db_url = random.choice(GALERA_NODES)elif mode == 'primary':# 使用主节点用于写操作db_url = GALERA_NODES[0]else:# 返回所有节点的引擎return [create_engine(url, **DB_KWARGS) for url in GALERA_NODES]return create_engine(db_url, **DB_KWARGS)# 针对 Galera 的连接池配置
DB_KWARGS = {"pool_size": 5,"max_overflow": 10,"pool_timeout": 30,"pool_recycle": 1800,"pool_pre_ping": True,# Galera 特定配置"connect_args": {"connect_timeout": 10,"read_timeout": 30,"write_timeout": 30,"charset": "utf8mb4",# 设置事务隔离级别"init_command": "SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED"}
}

9.2 读写分离实现

class GaleraSessionManager:"""Galera Cluster 会话管理器,实现读写分离"""def __init__(self):self.write_engine = get_galera_engine(mode='primary')self.read_engines = [get_galera_engine(mode='random') for _ in range(2)]self.WriteSessionLocal = sessionmaker(bind=self.write_engine)self.ReadSessionLocal = sessionmaker(bind=random.choice(self.read_engines))@contextmanagerdef get_read_db(self):"""获取读会话"""db = self.ReadSessionLocal()try:yield dbfinally:db.close()@contextmanagerdef get_write_db(self):"""获取写会话"""db = self.WriteSessionLocal()try:yield dbdb.commit()except Exception:db.rollback()raisefinally:db.close()# 使用示例
galera_manager = GaleraSessionManager()def read_user_profile(user_id: int):with galera_manager.get_read_db() as db:return db.query(User).filter(User.id == user_id).first()def update_user_profile(user_id: int, data: dict):with galera_manager.get_write_db() as db:user = db.query(User).filter(User.id == user_id).first()for key, value in data.items():setattr(user, key, value)

9.3 事务和一致性处理

from contextlib import contextmanager
from sqlalchemy.orm import Session
import timeclass GaleraTransaction:"""Galera 集群事务管理器"""MAX_RETRIES = 3RETRY_DELAY = 1  # 秒@staticmethod@contextmanagerdef atomic(db: Session):"""原子事务处理器,处理 Galera 特有的死锁和冲突"""retry_count = 0while True:try:yieldbreakexcept Exception as e:if retry_count >= GaleraTransaction.MAX_RETRIES:raiseif "deadlock" in str(e).lower() or "lock wait timeout" in str(e).lower():retry_count += 1time.sleep(GaleraTransaction.RETRY_DELAY)continueraise# 使用示例
def process_order(order_id: int):with galera_manager.get_write_db() as db:with GaleraTransaction.atomic(db):# 更新订单状态order = db.query(Order).filter(Order.id == order_id).first()order.status = 'processing'# 更新库存inventory = db.query(Inventory).filter(Inventory.product_id == order.product_id).first()inventory.quantity -= order.quantity

9.4 高可用性配置

class GaleraHighAvailability:"""Galera 集群高可用性管理"""def __init__(self):self.nodes = GALERA_NODESself.active_nodes = set()self.check_interval = 30  # 秒async def monitor_nodes(self):"""监控节点状态"""while True:for node in self.nodes:try:engine = create_engine(node, **DB_KWARGS)with engine.connect() as conn:# 检查节点状态result = conn.execute("SHOW STATUS LIKE 'wsrep_local_state_comment'")status = result.fetchone()[1]if status == 'Synced':self.active_nodes.add(node)else:self.active_nodes.discard(node)except Exception:self.active_nodes.discard(node)await asyncio.sleep(self.check_interval)def get_available_node(self):"""获取可用节点"""if not self.active_nodes:raise Exception("No available Galera nodes")return random.choice(list(self.active_nodes))# 使用示例
ha_manager = GaleraHighAvailability()@app.on_event("startup")
async def startup_event():# 启动节点监控asyncio.create_task(ha_manager.monitor_nodes())def get_db_connection():"""获取高可用数据库连接"""node = ha_manager.get_available_node()engine = create_engine(node, **DB_KWARGS)return sessionmaker(bind=engine)()

9.5 性能优化建议

  1. 读写分离策略

    • 读操作使用随机节点
    • 写操作固定使用主节点
    • 批量操作考虑负载均衡
  2. 事务处理

    • 使用 READ COMMITTED 隔离级别
    • 实现死锁重试机制
    • 控制事务大小和持续时间
  3. 连接池优化

    • 为不同节点维护独立的连接池
    • 根据节点角色调整连接池大小
    • 定期检查并清理空闲连接
  4. 监控和维护

    • 实时监控节点状态
    • 记录同步延迟
    • 实现故障自动转移
  5. 查询优化

    • 使用合适的索引
    • 避免长事务
    • 控制并发写入操作

7. 总结

良好的数据库配置对于 FastAPI 应用的性能和可靠性至关重要。通过合理配置连接池、正确处理事务、实施监控和日志记录,可以构建一个健壮的数据库访问层。要根据实际应用场景和负载情况,不断调整和优化配置参数。无论是使用依赖注入还是直接使用数据库会话,都要确保正确管理数据库资源,处理异常情况,并遵循最佳实践。

10. 完整的 CRUD 示例

10.1 模型定义

from sqlalchemy import Column, Integer, String, DateTime, Boolean
from sqlalchemy.sql import func
from pydantic import BaseModel, EmailStr
from typing import Optional# SQLAlchemy 模型
class User(Base):__tablename__ = "users"id = Column(Integer, primary_key=True, index=True)email = Column(String(255), unique=True, index=True, nullable=False)username = Column(String(50), unique=True, index=True)hashed_password = Column(String(255), nullable=False)is_active = Column(Boolean, default=True)created_at = Column(DateTime(timezone=True), server_default=func.now())updated_at = Column(DateTime(timezone=True), onupdate=func.now())# Pydantic 模型
class UserBase(BaseModel):email: EmailStrusername: strclass UserCreate(UserBase):password: strclass UserUpdate(BaseModel):email: Optional[EmailStr] = Noneusername: Optional[str] = Noneis_active: Optional[bool] = Noneclass UserResponse(UserBase):id: intis_active: boolcreated_at: datetimeupdated_at: Optional[datetime]class Config:orm_mode = True

10.2 服务层实现

from passlib.context import CryptContext
from typing import List, Optionalpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")class UserService:def __init__(self, galera_manager: GaleraSessionManager):self.db_manager = galera_managerdef get_user(self, user_id: int) -> Optional[User]:"""读取单个用户"""with self.db_manager.get_read_db() as db:return db.query(User).filter(User.id == user_id).first()def get_user_by_email(self, email: str) -> Optional[User]:"""通过邮箱读取用户"""with self.db_manager.get_read_db() as db:return db.query(User).filter(User.email == email).first()def get_users(self, skip: int = 0, limit: int = 100) -> List[User]:"""读取用户列表"""with self.db_manager.get_read_db() as db:return db.query(User).offset(skip).limit(limit).all()def create_user(self, user: UserCreate) -> User:"""创建新用户"""with self.db_manager.get_write_db() as db:with GaleraTransaction.atomic(db):# 检查邮箱是否已存在if self.get_user_by_email(user.email):raise ValueError("Email already registered")# 创建用户hashed_password = pwd_context.hash(user.password)db_user = User(email=user.email,username=user.username,hashed_password=hashed_password)db.add(db_user)db.flush()  # 获取自增IDdb.refresh(db_user)return db_userdef update_user(self, user_id: int, user_update: UserUpdate) -> Optional[User]:"""更新用户信息"""with self.db_manager.get_write_db() as db:with GaleraTransaction.atomic(db):db_user = db.query(User).filter(User.id == user_id).first()if not db_user:return None# 更新非空字段update_data = user_update.dict(exclude_unset=True)for key, value in update_data.items():setattr(db_user, key, value)db.flush()db.refresh(db_user)return db_userdef delete_user(self, user_id: int) -> bool:"""删除用户"""with self.db_manager.get_write_db() as db:with GaleraTransaction.atomic(db):db_user = db.query(User).filter(User.id == user_id).first()if not db_user:return Falsedb.delete(db_user)return True

10.3 API 路由实现

from fastapi import APIRouter, HTTPException, status, Depends
from typing import Listrouter = APIRouter(prefix="/users", tags=["users"])# 初始化服务
galera_manager = GaleraSessionManager()
user_service = UserService(galera_manager)@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(user: UserCreate):try:return user_service.create_user(user)except ValueError as e:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,detail=str(e))except Exception as e:raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,detail="Failed to create user")@router.get("/{user_id}", response_model=UserResponse)
async def read_user(user_id: int):user = user_service.get_user(user_id)if user is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,detail="User not found")return user@router.get("/", response_model=List[UserResponse])
async def read_users(skip: int = 0, limit: int = 100):return user_service.get_users(skip=skip, limit=limit)@router.put("/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user_update: UserUpdate):user = user_service.update_user(user_id, user_update)if user is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,detail="User not found")return user@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int):success = user_service.delete_user(user_id)if not success:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,detail="User not found")

10.4 使用示例

# 1. 创建用户
curl -X POST "http://localhost:8000/users/" \-H "Content-Type: application/json" \-d '{"email": "user@example.com", "username": "testuser", "password": "secret123"}'# 2. 获取用户列表
curl "http://localhost:8000/users/?skip=0&limit=10"# 3. 获取单个用户
curl "http://localhost:8000/users/1"# 4. 更新用户
curl -X PUT "http://localhost:8000/users/1" \-H "Content-Type: application/json" \-d '{"username": "newusername"}'# 5. 删除用户
curl -X DELETE "http://localhost:8000/users/1"

10.5 关键特性说明

  1. 读写分离

    • 读操作(get_user, get_users)使用读节点
    • 写操作(create_user, update_user, delete_user)使用写节点
  2. 事务处理

    • 所有写操作都在 GaleraTransaction.atomic 中执行
    • 自动处理死锁和重试
  3. 异常处理

    • 业务逻辑异常(如邮箱已存在)
    • 数据库异常
    • HTTP 响应异常
  4. 性能优化

    • 使用索引(email, username)
    • 分页查询
    • 选择性更新字段
  5. 数据验证

    • 使用 Pydantic 模型验证输入数据
    • 明确的响应模型定义

7. 总结

良好的数据库配置对于 FastAPI 应用的性能和可靠性至关重要。通过合理配置连接池、正确处理事务、实施监控和日志记录,可以构建一个健壮的数据库访问层。要根据实际应用场景和负载情况,不断调整和优化配置参数。无论是使用依赖注入还是直接使用数据库会话,都要确保正确管理数据库资源,处理异常情况,并遵循最佳实践。

11. 高并发和巨量数据处理

11.1 高并发优化

11.1.1 连接池配置
from sqlalchemy import create_engine
import multiprocessing# 计算最优连接池大小
CPU_COUNT = multiprocessing.cpu_count()
POOL_SIZE = CPU_COUNT * 2 + 1  # 基础连接数
MAX_OVERFLOW = POOL_SIZE * 2   # 最大溢出连接数# 高并发场景的连接池配置
engine = create_engine(DATABASE_URL,poolclass=QueuePool,pool_size=POOL_SIZE,max_overflow=MAX_OVERFLOW,pool_timeout=30,pool_recycle=1800,pool_pre_ping=True,# 设置 echo_pool 来监控连接池状态echo_pool=True
)# 连接池监控
@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_connection, connection_record, connection_proxy):logger.info(f"Connection pool size: {engine.pool.size()}")logger.info(f"Current checked out connections: {engine.pool.checkedin()}")
11.1.2 异步查询处理
from fastapi import FastAPI, BackgroundTasks
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.future import select# 异步数据库引擎
async_engine = create_async_engine("mysql+asyncmy://user:pass@localhost/db",pool_size=20,max_overflow=30
)async def get_async_session():async with AsyncSession(async_engine) as session:yield session# 异步查询示例
async def async_get_users(skip: int = 0, limit: int = 100):async with AsyncSession(async_engine) as session:query = select(User).offset(skip).limit(limit)result = await session.execute(query)return result.scalars().all()# API 实现
@router.get("/users/async")
async def get_users_async(background_tasks: BackgroundTasks,skip: int = 0, limit: int = 100
):# 将耗时操作放入后台任务background_tasks.add_task(process_user_data)return await async_get_users(skip, limit)
11.1.3 并发控制
from fastapi import HTTPException
from asyncio import Semaphore
import asyncio# 限制并发请求数
MAX_CONCURRENT_REQUESTS = 100
semaphore = Semaphore(MAX_CONCURRENT_REQUESTS)async def controlled_operation():async with semaphore:# 执行数据库操作pass# 使用 Redis 实现分布式锁
from redis import Redis
from redis.lock import Lockredis_client = Redis(host='localhost', port=6379, db=0)def with_distributed_lock(lock_name: str, timeout: int = 10):def decorator(func):async def wrapper(*args, **kwargs):lock = Lock(redis_client, lock_name, timeout=timeout)if not lock.acquire(blocking=True, blocking_timeout=5):raise HTTPException(status_code=423,detail="Resource is locked")try:return await func(*args, **kwargs)finally:lock.release()return wrapperreturn decorator# 使用示例
@router.post("/users/batch")
@with_distributed_lock("create_users_lock")
async def create_users_batch(users: List[UserCreate]):return await batch_create_users(users)

11.2 巨量数据处理

11.2.1 分批处理
from typing import List, Generator
import pandas as pdclass BatchProcessor:def __init__(self, batch_size: int = 1000):self.batch_size = batch_sizeself.galera_manager = GaleraSessionManager()def process_large_dataset(self, data: List[dict]):"""分批处理大数据集"""for batch in self._get_batches(data):with self.galera_manager.get_write_db() as db:with GaleraTransaction.atomic(db):self._process_batch(db, batch)def _get_batches(self, data: List[dict]) -> Generator[List[dict], None, None]:"""将数据集分割成小批次"""for i in range(0, len(data), self.batch_size):yield data[i:i + self.batch_size]def _process_batch(self, db: Session, batch: List[dict]):"""处理单个批次的数据"""try:# 使用 bulk_insert_mappings 进行批量插入db.bulk_insert_mappings(User, batch)db.flush()except Exception as e:logger.error(f"Error processing batch: {str(e)}")raise# 使用示例
@router.post("/users/bulk")
async def bulk_create_users(background_tasks: BackgroundTasks,users: List[UserCreate]
):processor = BatchProcessor(batch_size=1000)# 将大量数据处理放入后台任务background_tasks.add_task(processor.process_large_dataset, users)return {"message": "Bulk processing started"}
11.2.2 流式处理
from fastapi.responses import StreamingResponse
import csv
import ioclass StreamProcessor:CHUNK_SIZE = 1000@staticmethodasync def stream_large_query(query):"""流式处理大型查询结果"""with SessionLocal() as db:result_proxy = db.execute(query)while True:chunk = result_proxy.fetchmany(StreamProcessor.CHUNK_SIZE)if not chunk:breakyield chunk@staticmethodasync def export_large_dataset():"""导出大型数据集"""output = io.StringIO()writer = csv.writer(output)# 写入表头writer.writerow(["id", "email", "username", "created_at"])query = "SELECT id, email, username, created_at FROM users"async for chunk in StreamProcessor.stream_large_query(query):for row in chunk:writer.writerow(row)output.seek(0)data = output.read()output.seek(0)output.truncate(0)yield data# API 实现
@router.get("/users/export")
async def export_users():return StreamingResponse(StreamProcessor.export_large_dataset(),media_type="text/csv",headers={"Content-Disposition": "attachment; filename=users.csv"})
11.2.3 分区表处理
from sqlalchemy import Table, Column, Integer, String, DateTime
from sqlalchemy.schema import CreateTable
import datetimedef create_partition_table(year: int, month: int):"""创建分区表"""partition_name = f"users_{year}_{month:02d}"# 创建分区表partition = Table(partition_name,Base.metadata,Column('id', Integer, primary_key=True),Column('email', String(255)),Column('username', String(50)),Column('created_at', DateTime),postgresql_partition_by='RANGE (created_at)',)# 设置分区范围start_date = datetime.datetime(year, month, 1)if month == 12:end_date = datetime.datetime(year + 1, 1, 1)else:end_date = datetime.datetime(year, month + 1, 1)partition_sql = f"""CREATE TABLE {partition_name}PARTITION OF usersFOR VALUES FROM ('{start_date}') TO ('{end_date}')"""return partition_sqlclass PartitionManager:def __init__(self):self.engine = create_engine(DATABASE_URL)def ensure_partition_exists(self, date: datetime.datetime):"""确保指定日期的分区存在"""year = date.yearmonth = date.monthpartition_name = f"users_{year}_{month:02d}"# 检查分区是否存在exists_query = f"""SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '{partition_name}')"""with self.engine.connect() as conn:exists = conn.execute(exists_query).scalar()if not exists:# 创建新分区partition_sql = create_partition_table(year, month)conn.execute(partition_sql)# 使用示例
@router.post("/users/partitioned")
async def create_user_partitioned(user: UserCreate):partition_manager = PartitionManager()# 确保当前月份的分区存在partition_manager.ensure_partition_exists(datetime.datetime.now())# 创建用户with galera_manager.get_write_db() as db:with GaleraTransaction.atomic(db):db_user = User(**user.dict())db.add(db_user)return db_user
11.2.4 查询优化
from sqlalchemy import text
from sqlalchemy.orm import joinedload, selectinloadclass QueryOptimizer:def __init__(self, db: Session):self.db = dbdef optimize_large_table_query(self):"""优化大表查询"""# 1. 使用索引提示query = text("""SELECT /*+ INDEX(users email_idx) */FROM usersWHERE email LIKE :pattern""")# 2. 使用延迟加载query = self.db.query(User).options(selectinload(User.orders))# 3. 使用覆盖索引query = self.db.query(User.id, User.email, User.username).filter(User.is_active == True)# 4. 分页优化query = self.db.query(User).filter(User.id > last_id).order_by(User.id).limit(100)return querydef analyze_query(self, query):"""分析查询性能"""explain_sql = f"EXPLAIN ANALYZE {query}"return self.db.execute(explain_sql)# 使用示例
@router.get("/users/optimized")
async def get_users_optimized(db: Session = Depends(get_db),last_id: int = 0,limit: int = 100
):optimizer = QueryOptimizer(db)query = optimizer.optimize_large_table_query()return query.all()

11.3 性能监控

import time
from functools import wraps
from prometheus_client import Counter, Histogram# Prometheus 指标
DB_OPERATION_DURATION = Histogram('db_operation_duration_seconds','Database operation duration',['operation_type']
)
DB_OPERATION_ERRORS = Counter('db_operation_errors_total','Database operation errors',['operation_type']
)def monitor_db_operation(operation_type: str):def decorator(func):@wraps(func)async def wrapper(*args, **kwargs):start_time = time.time()try:result = await func(*args, **kwargs)duration = time.time() - start_timeDB_OPERATION_DURATION.labels(operation_type=operation_type).observe(duration)return resultexcept Exception as e:DB_OPERATION_ERRORS.labels(operation_type=operation_type).inc()raisereturn wrapperreturn decorator# 使用示例
@router.get("/users/monitored")
@monitor_db_operation("get_users")
async def get_users_monitored(skip: int = 0, limit: int = 100):return await async_get_users(skip, limit)

11.4 最佳实践建议

  1. 高并发处理

    • 使用适当的连接池大小
    • 实现请求限流
    • 使用异步操作
    • 实现分布式锁
  2. 大数据处理

    • 实现批量处理
    • 使用流式处理
    • 实现数据分区
    • 优化查询性能
  3. 性能监控

    • 监控数据库操作
    • 记录性能指标
    • 实现告警机制
  4. 扩展性考虑

    • 实现水平扩展
    • 使用读写分离
    • 实现缓存策略

7. 总结

良好的数据库配置对于 FastAPI 应用的性能和可靠性至关重要。通过合理配置连接池、正确处理事务、实施监控和日志记录,可以构建一个健壮的数据库访问层。要根据实际应用场景和负载情况,不断调整和优化配置参数。无论是使用依赖注入还是直接使用数据库会话,都要确保正确管理数据库资源,处理异常情况,并遵循最佳实践。

12. 水平扩展、读写分离和缓存策略

12.1 水平扩展实现

from typing import List, Optional
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import randomclass DatabaseCluster:"""数据库集群管理器"""def __init__(self):# 主数据库配置self.master_db = {"url": "mysql+pymysql://user:pass@master:3306/db","engine": None,"session_maker": None}# 从数据库配置列表self.slave_dbs = [{"url": "mysql+pymysql://user:pass@slave1:3306/db","weight": 1,  # 权重,用于负载均衡"engine": None,"session_maker": None},{"url": "mysql+pymysql://user:pass@slave2:3306/db","weight": 1,"engine": None,"session_maker": None}]self._initialize_connections()def _initialize_connections(self):"""初始化所有数据库连接"""# 初始化主库连接self.master_db["engine"] = create_engine(self.master_db["url"],pool_size=5,max_overflow=10,pool_timeout=30)self.master_db["session_maker"] = sessionmaker(bind=self.master_db["engine"])# 初始化从库连接for slave in self.slave_dbs:slave["engine"] = create_engine(slave["url"],pool_size=10,max_overflow=20,pool_timeout=30)slave["session_maker"] = sessionmaker(bind=slave["engine"])def get_master_session(self):"""获取主库会话"""return self.master_db["session_maker"]()def get_slave_session(self):"""获取从库会话(带权重的随机选择)"""total_weight = sum(slave["weight"] for slave in self.slave_dbs)r = random.uniform(0, total_weight)current_weight = 0for slave in self.slave_dbs:current_weight += slave["weight"]if r <= current_weight:return slave["session_maker"]()# 默认返回第一个从库return self.slave_dbs[0]["session_maker"]()# 使用示例
db_cluster = DatabaseCluster()class UserService:"""用户服务,演示读写分离"""@staticmethoddef get_user_by_id(user_id: int) -> Optional[User]:"""读操作:从从库读取"""session = db_cluster.get_slave_session()try:return session.query(User).filter(User.id == user_id).first()finally:session.close()@staticmethoddef create_user(user_data: dict) -> User:"""写操作:在主库写入"""session = db_cluster.get_master_session()try:user = User(**user_data)session.add(user)session.commit()return userfinally:session.close()

12.2 读写分离策略

from contextlib import contextmanager
from typing import Generator
from sqlalchemy.orm import Sessionclass ReadWriteManager:"""读写分离管理器"""def __init__(self):self.db_cluster = DatabaseCluster()@contextmanagerdef read_session(self) -> Generator[Session, None, None]:"""获取读会话(从从库)"""session = self.db_cluster.get_slave_session()try:yield sessionfinally:session.close()@contextmanagerdef write_session(self) -> Generator[Session, None, None]:"""获取写会话(从主库)"""session = self.db_cluster.get_master_session()try:yield sessionsession.commit()except Exception:session.rollback()raisefinally:session.close()# 服务层实现
class EnhancedUserService:def __init__(self):self.db_manager = ReadWriteManager()def get_users(self, skip: int = 0, limit: int = 100) -> List[User]:"""读操作示例"""with self.db_manager.read_session() as session:return session.query(User).offset(skip).limit(limit).all()def create_user(self, user_data: dict) -> User:"""写操作示例"""with self.db_manager.write_session() as session:user = User(**user_data)session.add(user)return userdef update_user(self, user_id: int, user_data: dict) -> Optional[User]:"""复杂操作示例:先读后写"""# 首先从从库读取with self.db_manager.read_session() as read_session:user = read_session.query(User).filter(User.id == user_id).first()if not user:return None# 然后在主库更新with self.db_manager.write_session() as write_session:user = write_session.query(User).filter(User.id == user_id).first()for key, value in user_data.items():setattr(user, key, value)return user

12.3 缓存策略实现

from functools import wraps
from typing import Any, Optional
import json
from redis import Redis
from datetime import timedeltaclass CacheManager:"""缓存管理器"""def __init__(self):self.redis_client = Redis(host='localhost',port=6379,db=0,decode_responses=True)self.default_ttl = timedelta(minutes=30)def get(self, key: str) -> Optional[str]:"""获取缓存"""return self.redis_client.get(key)def set(self, key: str, value: Any, ttl: Optional[timedelta] = None):"""设置缓存"""if ttl is None:ttl = self.default_ttlif isinstance(value, (dict, list)):value = json.dumps(value)self.redis_client.set(key, value, ex=int(ttl.total_seconds()))def delete(self, key: str):"""删除缓存"""self.redis_client.delete(key)def clear_pattern(self, pattern: str):"""清除匹配模式的所有缓存"""keys = self.redis_client.keys(pattern)if keys:self.redis_client.delete(*keys)# 缓存装饰器
def cache_result(prefix: str,ttl: Optional[timedelta] = None,key_func = None
):"""缓存装饰器:param prefix: 缓存键前缀:param ttl: 缓存过期时间:param key_func: 自定义缓存键生成函数"""cache_manager = CacheManager()def decorator(func):@wraps(func)async def wrapper(*args, **kwargs):# 生成缓存键if key_func:cache_key = f"{prefix}:{key_func(*args, **kwargs)}"else:# 默认使用参数作为缓存键key_parts = [str(arg) for arg in args]key_parts.extend(f"{k}:{v}" for k, v in sorted(kwargs.items()))cache_key = f"{prefix}:{':'.join(key_parts)}"# 尝试从缓存获取cached_result = cache_manager.get(cache_key)if cached_result:return json.loads(cached_result)# 执行原函数result = await func(*args, **kwargs)# 存入缓存cache_manager.set(cache_key, result, ttl)return resultreturn wrapperreturn decorator# 使用示例
class CachedUserService:def __init__(self):self.db_manager = ReadWriteManager()self.cache_manager = CacheManager()@cache_result(prefix="user", ttl=timedelta(minutes=10))async def get_user(self, user_id: int) -> Optional[dict]:"""带缓存的用户查询"""with self.db_manager.read_session() as session:user = session.query(User).filter(User.id == user_id).first()if user:return user.to_dict()return Noneasync def update_user(self, user_id: int, user_data: dict) -> Optional[dict]:"""更新用户并清除缓存"""with self.db_manager.write_session() as session:user = session.query(User).filter(User.id == user_id).first()if not user:return Nonefor key, value in user_data.items():setattr(user, key, value)# 清除相关缓存self.cache_manager.delete(f"user:{user_id}")# 清除可能包含该用户的列表缓存self.cache_manager.clear_pattern("user_list:*")return user.to_dict()@cache_result(prefix="user_list", ttl=timedelta(minutes=5))async def get_user_list(self,skip: int = 0,limit: int = 100,status: Optional[str] = None) -> List[dict]:"""带缓存的用户列表查询"""with self.db_manager.read_session() as session:query = session.query(User)if status:query = query.filter(User.status == status)users = query.offset(skip).limit(limit).all()return [user.to_dict() for user in users]# FastAPI 路由实现
@router.get("/users/{user_id}")
async def get_user(user_id: int):service = CachedUserService()user = await service.get_user(user_id)if not user:raise HTTPException(status_code=404, detail="User not found")return user@router.put("/users/{user_id}")
async def update_user(user_id: int, user_update: UserUpdate):service = CachedUserService()user = await service.update_user(user_id, user_update.dict())if not user:raise HTTPException(status_code=404, detail="User not found")return user@router.get("/users/")
async def get_users(skip: int = 0,limit: int = 100,status: Optional[str] = None
):service = CachedUserService()return await service.get_user_list(skip, limit, status)

12.4 最佳实践建议

  1. 水平扩展注意事项

    • 合理配置主从数据库的连接池大小
    • 实现故障转移机制
    • 监控数据库负载和性能
    • 定期检查主从同步状态
  2. 读写分离策略

    • 明确区分读写操作
    • 处理读写延迟问题
    • 实现事务一致性
    • 合理分配读写流量
  3. 缓存策略

    • 选择合适的缓存粒度
    • 设置合理的过期时间
    • 及时清除相关缓存
    • 处理缓存击穿和雪崩问题
  4. 性能优化

    • 使用连接池
    • 实现请求队列
    • 添加监控和告警
    • 定期优化数据库
  5. 可用性保障

    • 实现健康检查
    • 添加重试机制
    • 做好容错处理
    • 实现优雅降级

12.5 使用建议

  1. 水平扩展场景

    • 读多写少的业务
    • 高并发查询场景
    • 需要数据备份的场景
    • 地理分布式部署
  2. 读写分离使用场景

    • 读写比例失衡
    • 需要提高读取性能
    • 数据一致性要求不高
    • 复杂查询较多
  3. 缓存策略使用场景

    • 热点数据访问
    • 计算密集型查询
    • 接口响应优化
    • 减少数据库压力
  4. 注意事项

    • 定期检查系统性能
    • 监控资源使用情况
    • 做好数据备份
    • 制定应急预案

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/66749.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度解析 Java 的幻读现象与应对策略

目录 一、幻读现象的本质 二、幻读在 Java 数据库编程中的体现 三、幻读带来的问题 四、应对幻读的策略 1. 数据库隔离级别 2. 应用层解决方案 五、总结 在 Java 的数据库编程领域&#xff0c;幻读是一个不容忽视的概念。它涉及到数据库事务处理过程中数据一致性的关键问…

Glary Utilities Pro 多语便携版系统优化工具 v6.21.0.25

Glary Utilities是一款功能强大的系统优化工具软件&#xff0c;旨在帮助用户清理计算机垃圾文件、修复系统错误、优化系统性能等。 软件功能 清理和修复&#xff1a;可以清理系统垃圾文件、无效注册表项、无效快捷方式等&#xff0c;修复系统错误和蓝屏问题。 优化和加速&…

【贪心算法】洛谷P1106 - 删数问题

2025 - 12 - 26 - 第 46 篇 【洛谷】贪心算法题单 - 【贪心算法】 - 【学习笔记】 作者(Author): 郑龙浩 / 仟濹(CSND账号名) 目录 文章目录 目录P1106 删数问题题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 提示思路代码 P1106 删数问题 题目描述 键盘输入一个高…

Oracle 创建并使用外部表

目录 一. 什么是外部表二. 创建外部表所在的文件夹对象三. 授予访问外部表文件夹的权限3.1 DBA用户授予普通用户访问外部表文件夹的权限3.2 授予Win10上的Oracle用户访问桌面文件夹的权限 四. 普通用户创建外部表五. 查询六. 删除 一. 什么是外部表 在 Oracle 数据库中&#x…

基于FPGA的BPSK+costas环实现,包含testbench,分析不同信噪比对costas环性能影响

目录 1.算法仿真效果 2.算法涉及理论知识概要 3.Verilog核心程序 4.完整算法代码文件获得 1.算法仿真效果 本作品是之前作品的改进和扩展&#xff1a; 1.m基于FPGA的BPSK调制解调通信系统verilog实现,包含testbench,包含载波同步_csdn基于fpga的bpsk-CSDN博客 2.m基于FP…

Linux 目录操作详解

Linux目录操作详解 1. 获取当前工作目录1.1 getcwd()1.2 get_current_dir_name() 2. 切换工作目录2.1 chdir() 3. 创建和删除目录3.1 mkdir()3.2 rmdir() 4. 获取目录中的文件列表4.1 opendir() 打开目录4.2 readdir() 读取目录内容4.3 closedir() 关闭目录 5. dirent 结构体6.…

Spring 依赖注入详解:创建 Bean 和注入依赖是一回事吗?

1. 什么是依赖注入&#xff08;Dependency Injection&#xff0c;DI&#xff09;&#xff1f; 依赖注入 是 Spring IoC&#xff08;控制反转&#xff09;容器的核心功能。它的目标是将对象的依赖&#xff08;如其他对象或配置&#xff09;从对象本身中剥离&#xff0c;由容器负…

AI时代的网络安全:传统技术的落寞与新机遇

AI时代的网络安全&#xff1a;传统技术的落寞与新机遇 在AI技术飞速发展的浪潮中&#xff0c;网络安全领域正经历着前所未有的变革。一方面&#xff0c;传统网络安全技术在面对新型攻击手段时逐渐显露出局限性&#xff1b;另一方面&#xff0c;AI为网络安全带来了新的机遇&…

后端开发Web

Maven Maven是apache旗下的一个开源项目&#xff0c;是一款用于管理和构建java项目的工具 Maven的作用 依赖管理 方便快捷的管理项目依赖的资源&#xff08;jar包&#xff09;&#xff0c;避免版本冲突问题 统一项目结构 提供标准、统一的项目结构 项目构建 标准跨平台(…

前沿技术趋势洞察:2024年技术的崭新篇章与未来走向!

引言 时光飞逝&#xff0c;2024年已经来临&#xff0c;回顾过去一年&#xff0c;科技的迅猛进步简直让人目不暇接。 在人工智能&#xff08;AI&#xff09;越来越强大的今天&#xff0c;我们不再停留在幻想阶段&#xff0c;量子计算的雏形开始展示它的无穷潜力&#xff0c;Web …

【10.2】队列-设计循环队列

一、题目 设计你的循环队列实现。 循环队列是一种线性数据结构&#xff0c;其操作表现基于 FIFO&#xff08;先进先出&#xff09;原则并且队尾被连接在队首之后以形成一个循环。它也被称为“环形缓冲器”。 循环队列的一个好处是我们可以利用这个队列之前用过的空间。在一个普…

博客之星2024年度总评选——我的年度创作回顾与总结

2024年&#xff0c;是我在CSDN博客上持续耕耘、不断成长的一年。在此&#xff0c;与大家分享一下我的年度创作回顾与总结。 一、创作成长与突破 在人工智能领域&#xff0c;技术迭代迅速&#xff0c;知识更新频繁。为了保持自己的竞争力&#xff0c;在今年&#xff0c;我始终…

IDEA运行Java项目总会报程序包xxx不存在

我的在另外一台电脑上跑是没有问题的&#xff0c;在新的电脑上跑的时候&#xff0c;又出现了这个恶心的问题...... 思来想去&#xff0c;唯一的问题就是我的mavn环境没的配置好 如何在本地部署mavn环境&#xff0c;这里推荐一篇很好的文章&#xff1a; Maven安装与配置&…

java 根据前端传回的png图片数组,后端加水印加密码生成pdf,返回给前端

前端传回的png图片数组&#xff0c;后端加水印加密码生成pdf&#xff0c;返回给前端 场景&#xff1a;重点&#xff1a;maven依赖controllerservice 场景&#xff1a; 当前需求&#xff0c;前端通过html2canvas将页面报表生成图片下载&#xff0c;可以仍然不满意。 需要java后…

数据分库分表和迁移方案

在我们业务快速发展的过程中&#xff0c;数据量必然也会迎来突飞猛涨。那么当我们的数据量百倍、千倍、万倍、亿倍增长后&#xff0c;原有的单表性能就不能满足我们日常的查询和写入了&#xff0c;此时数据架构就不得不进行拆分&#xff0c;比如单表拆分成10张表、100张表、单个…

线上突发:MySQL 自增 ID 用完,怎么办?

线上突发&#xff1a;MySQL 自增 ID 用完&#xff0c;怎么办&#xff1f; 1. 问题背景2. 场景复现3. 自增id用完怎么办&#xff1f;4. 总结 1. 问题背景 最近&#xff0c;我们在数据库巡检的时候发现了一个问题&#xff1a;线上的地址表自增主键用的是int类型。随着业务越做越…

[Java] Solon 框架的三大核心组件之一插件扩展体系

1、Solon 的三大核心组件 核心组件说明Plugin 插件扩展机制提供“编码风格”的扩展体系Ioc/Aop 应用容器提供基于注入依赖的自动装配体系ContextHandler 通用上下文处理接口提供“开放式处理”适配体系&#xff08;俗称&#xff0c;三元合一&#xff09; 2、Solon Plugin 插件…

TRELLIS微软的图生3D

TRELLIS 教程目录&#xff1a; Youtube&#xff1a;https://www.youtube.com/watch?vJqFHZ-dRMhI 官网地址&#xff1a;https://trellis3d.github.io/ GitHub&#xff1a;https://github.com/Microsoft/TRELLIS 部署目录&#xff1a; 克隆项目 git clone --recurse-submodul…

Java导出通过Word模板导出docx文件并通过QQ邮箱发送

一、创建Word模板 {{company}}{{Date}}服务器运行情况报告一、服务器&#xff1a;总告警次数&#xff1a;{{ServerTotal}} 服务器IP:{{IPA}}&#xff0c;总共告警次数:{{ServerATotal}} 服务器IP:{{IPB}}&#xff0c;总共告警次数:{{ServerBTotal}} 服务器IP:{{IPC}}&#x…

【22】Word:小李-高新技术企业政策❗

目录 题目​ NO1.2 NO3 NO4 NO5.6 NO7.8 NO9.10 若文章中存在删除空白行等要求&#xff0c;可以到最后来完成。注意最后一定要检查此部分&#xff01;注意&#xff1a;大多是和事例一样即可&#xff0c;不用一摸一样&#xff0c;但也不要差太多。 题目 NO1.2 F12Fn&a…