说明
为了简化开发程序,特别是需要依赖全局数据的程序,例如:分布式任务需要知道当前可处理的任务;定时程序需要依据某个约束性全局变量。一个附带的好处是会大量减少对数据库产生的请求。
GlobalBuffer的代价并不高:
- 1 一个新的服务
- 2 多一次的序列和反序列化
内容
1 微服务结构
微服务 - 对象 - 微服务
这大约是第一次再用微服务去套调用微服务的对象。层级关系大约是这样的:
- 1 先构建了Redis数据库容器,用于一般的KV存储和消息队列(功能简单)。
- 2 因为直接访问Redis比较麻烦,需要有对应的py包等,所以构造了RedisAgent,用微服务方式来代理数据库请求。
- 3 因为直接调用接口写起来很麻烦,所以构造了RedisOrMongo对象,结合了Redis缓存和Mongo持久化存储的功能,进行初始化配置后就可以进行简单的重复调用。
- 4 不是所有的程序都会频繁的操作,可能只是偶尔读一次。而且程序所处的环境可能与算网的环境不在一起,频繁配置反而成了比较麻烦的事,所以现在又想构造GlobalBuffer来提供特别简单的数据操作透传。
2 设计
简单以及更简单
微服务的功能是很容易拓展的,本阶段要做的是第一步:就是透传KV操作。
GlobalBuffer 提供了一个可外网访问的地址,提供KV的读取与存储。同时GlobalBuffer需要给出变量命名的约束检查。
这样在许多程序分布式执行时,可以通过某个变量
3 实现
先进行命名: global_buffer_24088
再创建了项目文件夹之后,先按照GlobalFunc的【场景四:任意环境】方式将RedisOrMongo的相关文件拉取过来。需要额外关注的是,本来也可以直接使用【 场景二:算网机,挂载git目录使用】方式的,但是一方面最近没有时间去check这个镜像里内容是否是最新的;另一方面,当前需求确实足够简单,因此还是用前者。
场景四只要从ftp上下载几个文件就可以了
wget -NO RedisOrMongo_v100.py http://ftp/BaseFunc.RedisOrMongo_v100.py
wget -NO WMongo_V9000_012.py http://ftp/BaseFunc.WMongo_V9000_012.py
wget -NO worker_gen_file.py http://ftp/BaseFunc.worker_gen_file.py
采用Tornado来实现接口
server.py
from RedisOrMongo_v100 import RedisOrMongo,Naive,redis_cfg
# 各种rom的缓存
rom_dict = {}GetxHandler_path = r'/getx/'
class GetxHandler(tornado.web.RequestHandler):def post(self):request_body = self.request.bodysome_dict = json.loads(request_body)space = some_dict['space']varname = some_dict['varname']ttl = some_dict.get('ttl') or 86400if space not in rom_dict.keys():the_rom = RedisOrMongo(space, redis_cfg.dict(),backend='mongo', mongo_servername='m7.24065')rom_dict[space] = the_romelse:the_rom = rom_dict[space] res = the_rom.getx(varname,ttl = ttl)self.write(json.dumps(res))
getx_tuple = (GetxHandler_path,GetxHandler)
app_list.append(getx_tuple)SetxHandler_path = r'/setx/'
class SetxHandler(tornado.web.RequestHandler):def post(self):request_body = self.request.bodysome_dict = json.loads(request_body)space = some_dict['space']varname = some_dict['varname']value = some_dict['value']ttl = some_dict.get('ttl') or 86400persist = True if some_dict.get('persist') is not None else False if space not in rom_dict.keys():the_rom = RedisOrMongo(space, redis_cfg.dict(),backend='mongo', mongo_servername='m7.24065')rom_dict[space] = the_romelse:the_rom = rom_dict[space] # 如果persist 不为True那么只写redisres = the_rom.setx(varname,value,ttl = ttl,persist=persist)self.write(json.dumps(res))
setx_tuple = (SetxHandler_path,SetxHandler)
app_list.append(setx_tuple)if __name__ == '__main__':#tornado.options.parse_command_line()apps = tornado.web.Application(app_list, **settings)http_server = tornado.httpserver.HTTPServer(apps)define('port', default=8000, help='run on the given port', type=int)http_server.listen(options.port)# 单核# 多核打开注释# 0 是全部核# http_server.start(num_processes=10) # tornado将按照cpu核数来fork进程# ---启动print('Server Started')tornado.ioloop.IOLoop.instance().start()
只要启动server.py就可以了。
使用测试
import requests as req ...: req.get('http://%s:24088/' % wan_ip).text...:
Out[7]: '【GET】This is Website for Internal API SystemPlease Refer to API document'In [8]: getx_dict = {'space':'tem_test.test',...: 'varname':'x002',...: 'ttl': 500 # 可以不写,默认86400...: }...:...: getx_res = req.post('http://%s:24088/getx/' % wan_ip,json = getx_dict).json()...:In [9]: getx_res
Out[9]: 123In [10]: # setx 测试...: setx_dict = {'space':'tem_test.test',...: 'varname':'x002',...: 'value':11111,...: 'ttl': 500, # 可以不写,默认86400...: 'persist': 'yes' # persist 可以不写,为空时不持久化到mongo...: }...:...: setx_res = req.post('http://%s:24088/setx/' % wan_ip,json = setx_dict).json()...:In [11]: setx_res
Out[11]: TrueIn [12]: getx_res = req.post('http://%s:24088/getx/' % wan_ip,json = getx_dict).json()In [13]: getx_res
Out[13]: 11111In [14]: getx_res = req.post('http://%s:24088/getx/' % wan_ip,json = getx_dict).json()In [15]: getx_res
Out[15]: 11111
其中getx是从redis中取数(如果redis中没有数,那么就从mongo中读取),同时也可以设置ttl,这样取完之后一段时间数据会自动删除;setx则是设置变量,其中还有一个persist参数,如果不为空,那么就会持久化到mongo中;否则只是在redis中设置。
操作最慢大约耗时100ms,最短的不到3ms。