认证组件
Django原生的authentic组件为我们的用户注册与登录提供了认证功能,十分的简介与强大。同样DRF也为我们提供了认证组件,一起来看看DRF里面的认证组件是怎么为我们工作的!
models.py
# 定义一个用户表和一个保存用户Token的表 class UserInfo(models.Model): username = models.CharField(max_length=16) password = models.CharField(max_length=32) type = models.SmallIntegerField( choices=((1, '普通用户'), (2, 'VIP用户')), default=1 ) class Token(models.Model): user = models.OneToOneField(to='UserInfo') token_code = models.CharField(max_length=128)
15
1
# 定义一个用户表和一个保存用户Token的表
2
3
4
class UserInfo(models.Model):
5
username = models.CharField(max_length=16)
6
password = models.CharField(max_length=32)
7
type = models.SmallIntegerField(
8
choices=((1, '普通用户'), (2, 'VIP用户')),
9
default=1
10
)
11
12
13
class Token(models.Model):
14
user = models.OneToOneField(to='UserInfo')
15
token_code = models.CharField(max_length=128)
url
path('login/', views.LoginView.as_view()),
1
1
path('login/', views.LoginView.as_view()),
views.py
# 视图主要处理用户名、密码是否正确,用户每一次请求都要带着专有token来! import hashlib, time from rest_framework.response import Response from rest_framework.views import APIView def get_random_token(username): """ 根据用户名和时间戳生成随机token :param username: :return: """ timestamp = str(time.time()) m = hashlib.md5(bytes(username, encoding="utf8")) m.update(bytes(timestamp, encoding="utf8")) return m.hexdigest() class LoginView(APIView): """ 校验用户名密码是否正确从而生成token的视图 """ def post(self, request): res = {"code": 0} print(request.data) username = request.data.get("username") password = request.data.get("password") user = models.UserInfo.objects.filter(username=username, password=password).first() if user: # 如果用户名密码正确 token = get_random_token(username) models.Token.objects.update_or_create(defaults={"token_code": token}, user=user) res["token"] = token else: res["code"] = 1 res["error"] = "用户名或密码错误" return Response(res)
38
1
# 视图主要处理用户名、密码是否正确,用户每一次请求都要带着专有token来!
2
import hashlib, time
3
from rest_framework.response import Response
4
from rest_framework.views import APIView
5
6
7
def get_random_token(username):
8
"""
9
根据用户名和时间戳生成随机token
10
:param username:
11
:return:
12
"""
13
timestamp = str(time.time())
14
m = hashlib.md5(bytes(username, encoding="utf8"))
15
m.update(bytes(timestamp, encoding="utf8"))
16
return m.hexdigest()
17
18
19
class LoginView(APIView):
20
"""
21
校验用户名密码是否正确从而生成token的视图
22
"""
23
def post(self, request):
24
res = {"code": 0}
25
print(request.data)
26
username = request.data.get("username")
27
password = request.data.get("password")
28
29
user = models.UserInfo.objects.filter(username=username, password=password).first()
30
if user:
31
# 如果用户名密码正确
32
token = get_random_token(username)
33
models.Token.objects.update_or_create(defaults={"token_code": token}, user=user)
34
res["token"] = token
35
else:
36
res["code"] = 1
37
res["error"] = "用户名或密码错误"
38
return Response(res)
定义认证类model_serializer.py
# 这一步是要对着源码才能写出来 from rest_framework.authentication import BaseAuthentication from rest_framework.exceptions import AuthenticationFailed class MyAuth(BaseAuthentication): def authenticate(self, request): if request.method in ["POST", "PUT", "DELETE"]: request_token = request.data.get("token", None) if not request_token: raise AuthenticationFailed('缺少token') token_obj = models.Token.objects.filter(token_code=request_token).first() if not token_obj: raise AuthenticationFailed('无效的token') return token_obj.user.username, None else: return None, None
18
1
# 这一步是要对着源码才能写出来
2
from rest_framework.authentication import BaseAuthentication
3
from rest_framework.exceptions import AuthenticationFailed
4
5
6
class MyAuth(BaseAuthentication):
7
def authenticate(self, request):
8
if request.method in ["POST", "PUT", "DELETE"]:
9
request_token = request.data.get("token", None)
10
if not request_token:
11
raise AuthenticationFailed('缺少token')
12
token_obj = models.Token.objects.filter(token_code=request_token).first()
13
if not token_obj:
14
raise AuthenticationFailed('无效的token')
15
return token_obj.user.username, None
16
else:
17
return None, None
18
全局配置
# 在settings.py中配置 REST_FRAMEWORK = { "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.MyAuth", ] }
4
1
# 在settings.py中配置
2
REST_FRAMEWORK = {
3
"DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.MyAuth", ]
4
}
权限组件
只有vip才能看的内容
自定义权限类
# 自定义权限类 from rest_framework.permissions import BasePermission class MyPermission(BasePermission): message = 'VIP用户才能访问' def has_permission(self, request, view): """ 自定义权限只有VIP用户才能访问 """ # 因为在进行权限判断之前已经做了认证判断,所以这里可以直接拿到request.user if request.user and request.user.type == 2: # 如果是VIP用户 return True else: return False
15
1
# 自定义权限类
2
from rest_framework.permissions import BasePermission
3
4
class MyPermission(BasePermission):
5
message = 'VIP用户才能访问'
6
7
def has_permission(self, request, view):
8
"""
9
自定义权限只有VIP用户才能访问
10
"""
11
# 因为在进行权限判断之前已经做了认证判断,所以这里可以直接拿到request.user
12
if request.user and request.user.type == 2: # 如果是VIP用户
13
return True
14
else:
15
return False
视图级别配置
class CommentViewSet(ModelViewSet): queryset = models.Comment.objects.all() serializer_class = app01_serializers.CommentSerializer authentication_classes = [MyAuth, ] permission_classes = [MyPermission, ]
6
1
class CommentViewSet(ModelViewSet):
2
3
queryset = models.Comment.objects.all()
4
serializer_class = app01_serializers.CommentSerializer
5
authentication_classes = [MyAuth, ]
6
permission_classes = [MyPermission, ]
全局配置
REST_FRAMEWORK = { "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.MyAuth", ], "DEFAULT_PERMISSION_CLASSES": ["app01.utils.MyPermission", ] }
4
1
REST_FRAMEWORK = {
2
"DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.MyAuth", ],
3
"DEFAULT_PERMISSION_CLASSES": ["app01.utils.MyPermission", ]
4
}
频率组件
频率:限制用户访问网站的频率
自定义限制类
VISIT_RECORD = {} # 自定义限制 class MyThrottle(object): def __init__(self): self.history = None def allow_request(self, request, view): """ 自定义频率限制60秒内只能访问三次 """ # 获取用户IP ip = request.META.get("REMOTE_ADDR") timestamp = time.time() if ip not in VISIT_RECORD: VISIT_RECORD[ip] = [timestamp, ] return True history = VISIT_RECORD[ip] self.history = history history.insert(0, timestamp) while history and history[-1] < timestamp - 60: history.pop() if len(history) > 3: return False else: return True def wait(self): """ 限制时间还剩多少 """ timestamp = time.time() return 60 - (timestamp - self.history[-1])
33
1
VISIT_RECORD = {}
2
# 自定义限制
3
class MyThrottle(object):
4
5
def __init__(self):
6
self.history = None
7
8
def allow_request(self, request, view):
9
"""
10
自定义频率限制60秒内只能访问三次
11
"""
12
# 获取用户IP
13
ip = request.META.get("REMOTE_ADDR")
14
timestamp = time.time()
15
if ip not in VISIT_RECORD:
16
VISIT_RECORD[ip] = [timestamp, ]
17
return True
18
history = VISIT_RECORD[ip]
19
self.history = history
20
history.insert(0, timestamp)
21
while history and history[-1] < timestamp - 60:
22
history.pop()
23
if len(history) > 3:
24
return False
25
else:
26
return True
27
28
def wait(self):
29
"""
30
限制时间还剩多少
31
"""
32
timestamp = time.time()
33
return 60 - (timestamp - self.history[-1])
视图级别配置
class CommentViewSet(ModelViewSet): queryset = models.Comment.objects.all() serializer_class = app01_serializers.CommentSerializer throttle_classes = [MyThrottle, ]
x
1
class CommentViewSet(ModelViewSet):
2
3
queryset = models.Comment.objects.all()
4
serializer_class = app01_serializers.CommentSerializer
5
throttle_classes = [MyThrottle, ]
全局配置
# 在settings.py中设置rest framework相关配置项 REST_FRAMEWORK = { "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.MyAuth", ], "DEFAULT_PERMISSION_CLASSES": ["app01.utils.MyPermission", ] "DEFAULT_THROTTLE_CLASSES": ["app01.utils.MyThrottle", ] }
1
# 在settings.py中设置rest framework相关配置项
2
REST_FRAMEWORK = {
3
"DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.MyAuth", ],
4
"DEFAULT_PERMISSION_CLASSES": ["app01.utils.MyPermission", ]
5
"DEFAULT_THROTTLE_CLASSES": ["app01.utils.MyThrottle", ]
6
}
认证组件的源码阅读
新构建的request里面的源码的user方法
源码走到了这里其实就已经需要我们自己来进行认证了。就可以对前端的请求进行认证,到底该怎么认证,还得继续往下走!
权限组件的源码阅读
相比较与认证组件,权限组件就更加的简洁了