文章目录
- 相关术语
- 登录逻辑
- 登录设计
- 登录代码
相关术语
调用接口[wx.login()]获取登录凭证(code)。通过凭证进而换取用户登录态信息,包括用户在当前小程序的唯一标识(openid)、微信开放平台账号下的唯一标识(unionid,若当前小程序已绑定到微信开放平台账号)及本次登录的会话密钥(session_key)等。
临时登录凭证 code 只能使用一次。
如果开发者拥有多个移动应用、网站应用、和公众账号(包括小程序),可通过 UnionID 来区分用户的唯一性,因为只要是同一个微信开放平台账号下的移动应用、网站应用和公众账号(包括小程序),用户的 UnionID 是唯一的。换句话说,同一用户,对同一个微信开放平台下的不同应用,UnionID是相同的。
目前微信登录无法拿到微信昵称与头像,需要自己处理该逻辑。
登录逻辑
登录逻辑主要分两步:
首先这是对应的逻辑设计图:
首先由前端生成一个code,这个code 需要返回给后端,所以后端需要拿到一个必须得一个code 参数,
后端拿到这个code 需要去拿到对应小程序的三个appid,appsecret (这两个是小程序注册时提供),code(前端传参)去请求API:
GET https://api.weixin.qq.com/sns/jscode2session?appid=APPID&secret=SECRET&js_code=JSCODE&grant_type=authorization_code
请求参数
属性 | 类型 | 必填 | 说明 |
---|---|---|---|
appid | string | 是 | 小程序 appId |
secret | string | 是 | 小程序 appSecret |
js_code | string | 是 | 登录时获取的 code,可通过wx.login获取 |
grant_type | string | 是 | 授权类型,此处只需填写 authorization_code |
返回参数
属性 | 类型 | 说明 |
---|---|---|
session_key | string | 会话密钥 |
unionid | string | 用户在开放平台的唯一标识符,若当前小程序已绑定到微信开放平台账号下会返回,详见 UnionID 机制说明。 |
errmsg | string | 错误信息 |
openid | string | 用户唯一标识 |
errcode | int32 | 错误码 |
后端现在通过API拿到如上的参数,需要返回给前端对应的session_key,unionid(或者openid)这一步相当于前端使用 code 换取 openid、unionid、session_key 等信息。
现在如果需要通过这个信息拿到手机号还是需要进行第二步的处理。
前端拿到这个信息后,需要经过一系列操作返回给后端三个参数。
data.encryptedData, data.Iv, data.session_key
后端拿到这三个参数并可以解密手机号,该加密与解密思路如下:
AES对称加密算法,并且采用了 AES CBC(Cipher Block Chaining)模式。[后面会详细谈]
至此一个登录逻辑已经完成,但这里只是讲的是微信登录逻辑,下面进入到设计模块。
登录设计
任何一个登录模块都少不了数据库,所以这里还需要结合数据库JWT等进行讲述。
首先是后端设计的表,本次使用django 框架进行设计表,除了django自带的User表以外,需要再设计一个user_expand表,但由于可以做一个兼容,可以把三方登录信息在再单独做一个表出来,如下:
三方登陆信息表:
字段名 | 类型 | 说明 |
---|---|---|
id | int | 【主键】 |
user_id | int | 【逻辑外键】关联用户扩展表id |
platform | string | 用户来源 |
platform_unique_id | string | 新增字段,微信的就是openid,谷歌的就是用户的邮箱,其他的类似【三方登陆唯一标识】 |
yn | bool |
这里需要把三方登录单独拿出来,因为是一个用户可能有多个三方登录信息,所以需要要设计成一对多的形式。
登录逻辑如下(需要绑定手机号):
1.前端给后端一个code 2.后端通过API 获得openid 3.查找三方表,是否有这个openid ,如果有那么拿到对应的user_id,然后refresh = RefreshToken.for_user(user)
返回给前端一个access_token即可;如果没有查到这个openid,那么需要返回给前端一个信息就是该用户没有绑定过的信息,需要前端返回一个加密向量(以及后端会给前端一个openid)。 4. 后端拿到这个加密向量,解密出这个手机号信息后,把手机号信息以及openid插入到对应数据表中,返回给前端access_token 以表示绑定成功。
登录代码
首先是微信登录使用的一些工具函数:
class WeiXinMiniAppLogin:async def get_three_login_unique_id(self, param: dict):code = param.get("token")login_params = WeiXinMiniAppLoginParams()wx_info = await self.get_wx_miniapp_info(code, await login_params.get_params()) # 获取到wx_info 的信息logger.info(f"wx_info: {wx_info}")return wx_info@staticmethoddef get_login_type():return UserSourceEnum.WEIXINMINIAPP.value@staticmethodasync def get_wx_miniapp_info(code: str, login_params: dict):params = {"appid": login_params.get("appid", ""),"secret": login_params.get("secret", ""),"js_code": code,"grant_type": login_params.get("grant_type", ""),}jscode2session_url = login_params.get("jscode2session_url", "")try:async with aiohttp.ClientSession() as session:async with session.get(jscode2session_url, params=params, ssl=False) as response:if response.status != 200:logger.error(f"WeiXinMiniAppLogin.jscode2session HTTP error: {response.status}")return Nonea = await response.text() # Debugdata = json.loads(a)# print(response)# return data# data = await response.json() # B端的写法是data = json.loads(response.text),异步优先使用这个方法except Exception as e:logger.error(f"WeiXinMiniAppLogin.jscode2session error for HTTP: {e}")return Noneerror_code = data.get("errcode")if error_code:logger.error(f"WeiXinMiniAppLogin.jscode2session error_code: {data}")return Nonesession_key = data.get("session_key")openid = data.get("openid")unionid = data.get("unionid", None) # 注意为空的情况# 如果为空,记录下,本小程序都是空的情况if unionid is None:# logger.error(f"WeiXinMiniAppLogin.jscode2session error: missing unionid in {data}")r_data = {"session_key": session_key,"openid": openid}return r_data# 如果不空的话就返回,实际情况就是只有openidr_data = {"unionid": unionid,"session_key": session_key,"openid": openid}return r_data@staticmethodasync def get_phone_number(encrypted_data, aes_iv, session_key):try:session_key = session_key.replace("\\", "")session_key_bytes = b64decode(session_key)aes_iv_bytes = b64decode(aes_iv)encrypted_data_bytes = b64decode(encrypted_data)cipher = AES.new(session_key_bytes, AES.MODE_CBC, aes_iv_bytes)decrypted_bytes = cipher.decrypt(encrypted_data_bytes)padding_len = decrypted_bytes[-1]decrypted_bytes = decrypted_bytes[:-padding_len]decrypted_str = decrypted_bytes.decode('utf-8')model = json.loads(decrypted_str)phone_number = model.get("phoneNumber", "")return phone_numberexcept Exception as ex:logger.error(f"WeiXinMiniAppLogin.get_phone_number error,encrypted_data:{encrypted_data},aes_iv:{aes_iv},session_key:{session_key},errormsg:{ex}")return None
这里对get_phone_number的解密方法做一些说明:
这个函数使用的是 AES对称加密算法,并且采用了 AES CBC(Cipher Block Chaining)模式。
具体分析:
-
AES (Advanced Encryption Standard):一种对称加密算法,常用于保护数据的安全性。对称加密意味着加密和解密都使用相同的密钥(即这里的 session_key)。
-
AES.MODE_CBC (Cipher Block Chaining 模式):这是AES的一种工作模式,CBC模式是将前一个加密块的密文与当前块进行异或后再进行加密。解密时也要通过相同的初始化向量(IV,即这里的 aes_iv)来解密。
-
b64decode:表示输入的 session_key、aes_iv 和 encrypted_data 是通过 Base64 编码的,在解密之前需要将其从 Base64 格式解码成原始字节流。
-
填充和去除填充:AES加密的数据块必须是固定长度(一般为128位,也就是16字节),如果数据长度不够,就会自动添加填充字符。在解密后,代码通过 padding_len = decrypted_bytes[-1] 获取填充的长度,并通过 decrypted_bytes[:-padding_len] 去除填充。
-
最终数据解密:解密后的数据是一个 JSON 字符串,最后通过 json.loads(decrypted_str) 解析为字典对象,获取其中的 phoneNumber。
下面是登录接口的设计,注意需要构造两个接口,
首先是三方登录回调接口:
前端传入参数:
{"login_type": "string", # 可省略,做一个三方登录标识"token": "string"
}
后端返回参数:
//需要绑定手机号
{"status": 200,"message": "OK","data": {"third_login_unique_id": "wx_XXXXXXXXX","need_binding": true, # 需要绑定手机号"session_key": "some_session_key"}
}
// 之前绑定手机号的
// {
// "status": 200,
// "message": "OK",
// "data": {
// "need_binding": false,
// "refresh": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.",
// "access": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXV"
// }
// }
然后是绑定手机号的逻辑:
前端传入参数:
{"third_login_unique_id": "wx_onwXXXXXX","encryptedData": "xxxx","Iv": "xxxx","session_key": "xxxx"
}
后端返回参数:
{"status": 200,"message": "OK","data": {"refresh": "eyJhbGciOiJIUzI1N","access": "eyJhbGciO"}
}
@api_controller('user/', tags=['login'], permissions=[])
class LoginController:@http_post('third_login/, response=ThirdLoginBResponse)async def third_login(self, data: ThirdLoginBRequest):# 获取登录类型login_type = data.login_type # weixinminiappthird_login_instances = third_login_initializer.get_instance(login_type)if third_login_instances is None:raise HttpError(status_code=400, detail="third_login_instances is None")# 获取微信信息wx_info = await third_login_instances.get_three_login_unique_id({"token": data.token})if wx_info is None:raise HttpError(status_code=400, detail="three_login_unique_id is None")# 获取 openid 和 unionidopenid = wx_info.get("openid")unionid = wx_info.get("unionid", None)logger.info(f"ThirdLoginBView.post openid:{openid},unionid:{unionid},login_type:{login_type}")old_third_login_unique_id = f"wx_{openid}" # 小程序只有openid无法拿到unionid# new_third_login_unique_id = f"wx_{unionid}" if unionid else old_third_login_unique_id# 先从三方表里面去查这个用户user_social_account = await UserSocialAccount.objects.filter(platform_unique_id=old_third_login_unique_id,platform=login_type).afirst()User = get_user_model()# 如果用户存在,直接返回对应的token值if user_social_account:user_id = user_social_account.user_iduser = await User.objects.aget(id=user_id)refresh = RefreshToken.for_user(user)response_data = {"status": 200,"message": "OK","data": {"third_login_unique_id": old_third_login_unique_id, # 只有openid"need_binding": False,"refresh": str(refresh),"access": str(refresh.access_token),}}response_data["data"]["session_key"] = wx_info.get("session_key") # 调试解密,该参数只有在需要绑定的时候返回,这里做个测试保留else:# 如果用户不存在,返回需要绑定的信息response_data = {"status": 200,"message": "OK","data": {"third_login_unique_id": old_third_login_unique_id,"need_binding": True}}if login_type == UserSourceEnum.WEIXINMINIAPP.value:response_data["data"]["session_key"] = wx_info.get("session_key")return ThirdLoginBResponse(**response_data)@http_post('wx_binding_phone/', response=WXBindingPhoneResponse)async def wx_binding_phone(self, data: WXBindingPhoneRequest):# logger.info("ThirdLoginView.post start:" + json.dumps(data.dict(), ensure_ascii=False))# 解密获取手机号wx_mini_app_login = WeiXinMiniAppLogin()phone_number = await wx_mini_app_login.get_phone_number(data.encryptedData, data.Iv, data.session_key)if not phone_number:raise HttpError(status_code=400, detail="无法解密获取手机号")# 获取用户模型User = get_user_model()# 查找或创建用户user, created = await User.objects.aget_or_create(username=phone_number)if created:user.set_unusable_password()await user.asave()# 查找或创建用户扩展信息user_expand, created = await UserExpand.objects.aget_or_create(user=user)if created:user_expand.phone = user.usernameuser_expand.nick_name = phone_numberuser_expand.avatar = "https://thirdwx.qlogo.cn/mmopen/vi_32/POgEwh4mIHO4nibH0KlMECNjjGxQUq24ZEaGT4poC6icRiccVGKSyXwibcPq4BWmiaIGuG1icwxaQX6grC9VemZoJ8rg/132"# 缺头像,暂时不处理,昵称就是手机号,目前无法拿到昵称和头像await user_expand.asave()# 创建新的 UserSocialAccount 对象并保存user_social_account = UserSocialAccount(user_id=user.id,platform=UserSourceEnum.WEIXINMINIAPP.value,platform_unique_id=data.third_login_unique_id,)await user_social_account.asave()else:passrefresh = RefreshToken.for_user(user)# 构建响应数据response_data = {"status": 200,"message": "OK","data": {'refresh': str(refresh),'access': str(refresh.access_token),}}return WXBindingPhoneResponse(**response_data)