前言
因为CMDB内部的需求,需要一个API进行数据传输,用来传递需要抓取的服务端信息信息给抓取的autoclient,autoclient抓取好之后再通过API传输到服务器,保存到数据库。但是为了防止恶意的API访问,需要做一个验证。
设想一
可以在客户端跟服务端都规定好一串随机字符串做验证,只有当带着这串验证的请求发送过来的时候,才让其进行访问。
如果学过了爬虫,大家很容易就发现,这串随机字符串在浏览器里面是可以监听的,多观察几次总是会发现的。而且无论通过如何的方式,只要暴露在外面,都是会被察觉的。此时,就需要对其进行加密。
设想二
既然随机字符串在web传输中是明文状态,那我们试着将其转换成密文的,转换下思路,如果每次的请求都是随机字符串配合一串一直变动的值进行md5加密,此时,产生的验证字符串也应该变成动态密文。用什么做动态字符串来配合约定好的字符串进行md5加密呢?既然是动态的,时间戳,是最好的选择。
此时一切看着都很完美,但是,忽略了一点,无论是不是加密的字符串,http请求的时候都是可以监听到的,所以即使加密,即使不知道怎么加密的,依旧可以直接拿着这串字符串直接进行验证访问。尴尬。。。
设想三
其实上面的设想二已经做到了动态,思路上只要改一点就立刻变成可行的了。在http请求的时候,即使是在相当糟糕的网络环境里,也不会需要发送非常长的时间,因此,卡住时间便可以实现。
在拿到client的时间戳是,服务器段只需要跟当前时间戳进行比对,如果时间间隔小于10秒就当作正常访问。就可以了。
上面的设计思路完美的解决了动态的问题,此时不免还有疑问,如果真的会在10s内盗取到字符串直接访问呢?
完善思路
基于上面的问题,可以再多加异步验证,写一个列表,访问过的字符串都放在列表里,后面的访问都跟此列表比对下,如果在此列表内,就拒绝访问。
如上的设计思路就可以解决问题。
优化
最后的设计思路肯定能解决问题,但是也存在一个问题,就是,随着时间进度的推移,访问列表一定会越来越大,始终是不友好的一点。肯定需要给字符串设计一个超时时间。
visited_list = ['28g12b12128912e2kj|127381237812391', '829312g12be120e102ej12je91|12312984123123',....]
如果以上述的方式取跟系统时间比较当然是一件很费事的工作,占用很多的IO,可以使用redis来轻松实现这个功能。
CBV通过此类装饰方式实现验证
验证代码
def api_auth_method(request):auth_key = request.META.get('HTTP_AUTH_KEY')if not auth_key:return Falsesp = auth_key.split('|')if len(sp) != 2:return Falseencrypt, timestamp = sptimestamp = float(timestamp)limit_timestamp = time.time() - ASSET_AUTH_TIMEprint(limit_timestamp, timestamp)if limit_timestamp > timestamp:return Falseha = hashlib.md5(ASSET_AUTH_KEY.encode('utf-8'))ha.update(bytes("%s|%f" % (ASSET_AUTH_KEY, timestamp), encoding='utf-8'))result = ha.hexdigest()print(result, encrypt)if encrypt != result:return Falseexist = Falsedel_keys = []for k, v in enumerate(ENCRYPT_LIST):print(k, v)m = v['time']n = v['encrypt']if m < limit_timestamp:del_keys.append(k)continueif n == encrypt:exist = Truefor k in del_keys:del ENCRYPT_LIST[k]if exist:return FalseENCRYPT_LIST.append({'encrypt': encrypt, 'time': timestamp})return Truedef api_auth(func):def inner(request, *args, **kwargs):if not api_auth_method(request):return JsonResponse({'code': 1001, 'message': 'API授权失败'}, json_dumps_params={'ensure_ascii': False})return func(request, *args, **kwargs)return inner