源码见:"fastapi_study_road-learning_system_online_courses: fastapi框架实战之--在线课程学习系统"
接上一篇文章FastAPI(六十六)实战开发《在线课程学习系统》接口开发--用户注册接口开发。这次我们分享实际开发--用户登陆接口开发。
我们先来梳理下逻辑:
1.查询用户是否存在
2.校验密码是否正确
3.密码校验失败记录失败次数
4.60分钟内失败次数大于等于3次,60分钟内不能登陆
5.密码校验通过产生对应的token返回
接着我们去设置pydantic登录参数校验模型,同样添加到user_schemas.py中
class UserLogin(UserBase):"""登录校验模型"""password: str = Field(min_length=8, max_length=16)
这里我们继承的是之前的UserBase。
对应操作数据库查询用户的逻辑我们使用之前注册的时候使用的get_by_username即可。
我们把密码输入失败和token的值放在redis中,那么redis对应的配置,我们在搭建架构时已经配置好了,都放在了.env中:
ENV = "DEV"# mysql
MYSQL_HOST = "10.30.10.36"
MYSQL_PORT = 3306
MYSQL_USERNAME = "root"
MYSQL_PASSWORD = "123456"
MYSQL_DB_DEV = "learn_onsite_system_dev"
MYSQL_DB_TEST = "learn_onsite_system_test"
MYSQL_DB_PRO = "learn_onsite_system_pro"# redis
REDIS_HOST = "10.30.10.36"
REDIS_PORT = "6379"
REDIS_DB = "0"
而且redis初始化相关逻辑之前我是放在了mian.py主文件中,今天我将其单独提取出来维护
"""
-*- encoding=utf-8 -*-
Time: 2024/7/22 16:02
Author: lc
Email: 15101006331@163.com
File: redis.py
"""
from aioredis import Redis, create_redis_pool
from settings.config import REDIS_CONFIGasync def create_redis() -> Redis:return await create_redis_pool(f"redis://:@{REDIS_CONFIG['host']}:{REDIS_CONFIG['port']}/{REDIS_CONFIG['db']}?encoding=utf-8")
再将其导入到main.py中
from middlewares.redis import create_redis@app.on_event("startup")
async def startup_event():app.state.redis = await create_redis()print("init redis success")create_tables()print("init database success")init_roles()print("init roles success")@app.on_event("shutdown")
async def shutdown_event():app.state.redis.close()await app.state.redis.wait_closed()print("redis closed")
我们把token相关配置也配置进去
ENV = "DEV"# mysql
MYSQL_HOST = "10.30.10.36"
MYSQL_PORT = 3306
MYSQL_USERNAME = "root"
MYSQL_PASSWORD = "123456"
MYSQL_DB_DEV = "learn_onsite_system_dev"
MYSQL_DB_TEST = "learn_onsite_system_test"
MYSQL_DB_PRO = "learn_onsite_system_pro"# redis
REDIS_HOST = "10.30.10.36"
REDIS_PORT = "6379"
REDIS_DB = "0"# TOKEN
SECRET_KEY = "08d25e094faa6ca2556c819756bhj9563b93f7099f6f0f4xxd6cf93b33e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
那么产生token的代码如何实现呢
from jose import JWTError, jwt
from settings.config import TOKEN_CONFIGdef create_access_token(data: dict):"""产生token"""to_encode = data.copy()encoded_jwt = jwt.encode(to_encode, TOKEN_CONFIG["secret_key"], algorithm=TOKEN_CONFIG["algorithm"])return encoded_jwt
接下来就是根据逻辑去实现具体的登录逻辑了,在user_method.py中增加如下方法:
async def verify_login(request: Request, user: UserLogin, db: Session):logger.info("登录开始了")db_user = get_by_username(db, user.username)if not db_user:logger.warning(f"用户:’{user.username}‘ 不存在")return response(code=100205, message="用户不存在")verify = verify_password(user.password, db_user.password)if verify:redis_user = await request.app.state.redis.get(user.username)if not redis_user:try:token = create_access_token(data={"sub": user.username})except:logger.warning(f"method verify_login error: {format_exc()}")return response(code=100203, message="生产token失败")await request.app.state.redis.set(user.username, token, expire=TOKEN_CONFIG["access_token_expire_time"])return response()return response(code=100202, message="重复登录")else:error_key = user.username + "_password"result = await request.app.state.redis.hgetall(error_key, encoding="utf-8")# 没有查到认为是第一次出现错误,将次数设置为1,时间设置为当前时间if not result:current_time = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")await request.app.state.redis.hmset_dict(error_key, num=1, time=current_time)return response(code=100206, message="密码错误")# 查到则不是第一次,要分多重情况else:error_num = int(result["num"])num_time = (datetime.now() - datetime.strptime(result["time"], "%Y-%m-%d %H:%M:%S")).seconds / 60# 60分钟内错误没达到3次,错误次数加1if error_num < 3 and num_time < 60:error_num += 1await request.app.state.redis.hmset_dict(error_key, num=error_num)return response(code=100206, message="密码错误")# 超60分钟没有达到3次,错误次数重置为1,时间设置为当前elif error_num < 3 and num_time > 60:error_num = 1num_time = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")await request.app.state.redis.hmset_dict(error_key, num=error_num, time=num_time)return response(code=100206, message="密码错误")# 60分钟内错误超过3次,错误次数加1,限制60分钟内不可以登录elif error_num >= 3 and num_time < 60:error_num += 1await request.app.state.redis.hmset_dict(error_key, num=error_num)return response(code=100204, message="输入密码错误次数过多,账号暂时锁定,请60分钟后再来登录")# 超60分钟,如果再次输错,将错误次数重置为1,时间设置为当前时间else:error_num = 1num_time = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")await request.app.state.redis.hmset_dict(error_key, num=error_num, time=num_time)return response(code=100206, message="密码错误")
接下来,在user.py中增加我们的登录接口
@user_router.post("/login", summary="登录")
async def login(request: Request, user: UserLogin, db: Session = Depends(create_db)):return await verify_login(request, user, db)
测试:
①:成功
②:密码错误
③:60分钟内连续3次登录错误
至此,我们的登录接口就完成了