Python 全栈系列239 使用消息队列完成分布式任务

说明

在Python - 深度学习系列32 - glm2接口部署实践提到,通过部署本地化大模型来完成特定的任务。

由于大模型的部署依赖显卡,且常规量级的任务需要大量的worker支持,从成本考虑,租用算力机是比较经济的。由于任务是属于超高计算传输比的类型,且算力机随时可能出现不稳定的情况。

所以,使用消息队列完成此项任务是比较合适的。本次目标:

  • 1 回顾并快速搭建RabbitMQ和RabbitAgent服务的方法
  • 2 在无端口算力租用商(AutoDL)下部署chatglm2服务,并启动Worker处理数据
  • 3 在有端口算力租用商(仙宫云)下部署chatglm2服务,并用nginx反向代理,然后在异地启动worker测试

内容

1 构建消息队列(Server)

1.1 RabbitMQ镜像

先采用之前的命令启动
在这里插入图片描述
在算力机使用阿里云镜像仓库拉取,分钟级完成启动
在这里插入图片描述

1.2 RabbitAgent服务

理论上,应该封装为镜像后,以容器方式启动。不过租用的算力机系统盘太小(50G),装完CUDA之后只剩下10G多的空间,所以这次就把项目文件搬过去,在宿主机启动。
以后这类轻量级的服务,可以用一个很小的python环境镜像封装。

在这里插入图片描述

在这里插入图片描述

res = req.post('http://IP:24098/send_workq_message/', json = para_dict)
<Response [200]>
# 6 永久启动服务nohup python3 server.py  >/dev/null 2>&1 &

在这里插入图片描述
消费者(手动确认消息模式)

            import pikaimport jsoncredentials = pika.PlainCredentials('user', 'passwd')connection = pika.BlockingConnection(pika.ConnectionParameters('IP', port, '/', credentials))channel = connection.channel()def callback(ch, method, properties, body):input_data = json.loads(body.decode())print(f" [x] Received ",input_data)# time.sleep(body.count(b'.'))print(" [x] Done")ch.basic_ack(delivery_tag = method.delivery_tag)# channel.queue_declare(queue='hello1',durable=True)# 消费者预取消息数channel.basic_qos(prefetch_count=3)# 1 消费持久化的队列#channel.basic_consume(queue='hello1',#                        on_message_callback=callback, auto_ack =False)    # 2 消费非持久化队列channel.basic_consume(queue='hello2',on_message_callback=callback, auto_ack =False)    print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()[*] Waiting for messages. To exit press CTRL+C[x] Received  {'msg_id': 1, 'msg': 'first msg'}[x] Done[x] Received  {'msg_id': 2, 'msg': 'second msg'}[x] Done[x] Received  {'msg_id': 1, 'msg': 'first msg'}[x] Done[x] Received  {'msg_id': 2, 'msg': 'second msg'}[x] Done

1.3 将任务数据通过RabbitAgent写入

写入2.8万条,耗时5秒。
在这里插入图片描述

2 无端口算力租用商Worker测试

除了一定要返回的结果数据,还应该加上机器名称,显卡配置与处理时长。

2.1 启动服务

无端口算力机的代表就是AutoDL了,他们家机器也偏贵,4090一小时2.5~2.6元,比仙宫云高不少(我还是比较prefer后者的)。目前暂时没发现AutoDL有什么特别的优点,中规中矩。

在这里插入图片描述

发送文件

rsync -rvltz  -e 'ssh -p 44620'  --progress /data/implement_container_file/chatglm2_6b_int4_api/workspace  root@connect.westb.seetacloud.com:/root/autodl-tmp/

在这里插入图片描述
然后修改api.py中模型加载的位置和端口号,启动3个服务。

以下是获取单条数据并进行调试的方法

import pika
import json
credentials = pika.PlainCredentials('x', 'xxx')
connection = pika.BlockingConnection(pika.ConnectionParameters('xxxx', 24091, '/', credentials,heartbeat=600))
channel = connection.channel()
# 声明一个队列
channel.queue_declare(queue='ent_intro_task', durable=True)
# 从队列中获取一条消息
method_frame, header_frame, body = channel.basic_get(queue='ent_intro_task')
data = body.decode('utf-8')
data1 = json.loads(data)
...
connection.close()

完成测试之后,打包为ent_intro_worker.py,该脚本接受一个端口输入,以便将worker和server匹配起来,充分利用资源。

import pika
import json
import timeimport sys
import requests as req # 获取命令行参数
if len(sys.argv) > 1:parameter_value = sys.argv[1]print("传入的参数值为:", parameter_value)
else:print("未传入参数")
def send_resp(a_message):message_list = [a_message]para_dict = {}para_dict['rabbit'] = 'rabbit01'para_dict['routing_key'] = 'ent_intro_result'para_dict['durable'] = Truepara_dict['message_list'] = message_listpara_dict['queue'] = 'ent_intro_result'resp = req.post('http://IP:PORT/send_workq_message/', json = para_dict)return True tmp ='''
成立日期:%s 
注册地址:%s
%s简介,字数在100-200字之间
'''
credentials = pika.PlainCredentials('andy', 'andy123')
connection = pika.BlockingConnection(pika.ConnectionParameters('IP', PORT, '/', credentials, heartbeat=600))channel = connection.channel()# 手动确认
def callback(ch, method, properties, body):input_data = json.loads(body.decode())print(f" [x] Received ",input_data)tick1 = time.time()prompt_content = {'prompt': tmp % (input_data['reg_dt'], input_data['addr'], input_data['ent_table_name'] )}res = req.post('http://127.0.0.1:%s/' % parameter_value, json =prompt_content).json()tick2 = time.time()a_message = {}a_message['company'] =  input_data['ent_table_name']a_message['intro'] =  res['response']a_message['spends'] = tick2-tick1send_resp(a_message)print(" [x] Done")ch.basic_ack(delivery_tag = method.delivery_tag)channel.queue_declare(queue='ent_intro_task',durable=True)
channel.basic_qos(prefetch_count=3)
channel.basic_consume(queue='ent_intro_task',on_message_callback=callback, auto_ack =False)    print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

该worker获取数据,然后向本地大模型服务请求结果,然后将结果写到结果队列。启动worker进行测试,python3 ent_intro_worker.py 24096
没问题后就转入后台运行:nohup python3 ent_intro_worker.py 24096 >/dev/null 2>&1 &

在这里插入图片描述

3 有端口算力租用商Worker测试

3.1 负载均衡

由于单个的量化模型不足以充分利用显卡的性能,所以就要启动多个同样的服务。调用时需要进行多个服务的端口指定,这样就比较麻烦。
在这里插入图片描述
用nginx进行负载均衡,然后只暴露一个端口作为服务接口。然后接下来在远程主机调用这个服务接口(worker)。

租用一台仙宫云主机。数据上传有点问题,感觉它的云盘是外挂的,而且不稳定。最终我把数据先传到系统盘,再从系统盘传到云盘才成功。另外在启动服务时,模型的加载时间明显太长了。感觉云盘是机械盘。

rsync -rvltz  -e 'ssh -p 111'  --progress /data/implement_container_file/chatglm2_6b_int4_api/workspace  root@m1ehp5n70rxvg81b.ssh.x-gpu.com:/root/
==> /root/cloud/

安装包

pip3 install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/

启动三个服务。

安装、配置并启动nginx。

events {#设置工作模式为epoll,除此之外还有select,poll,kqueue,rtsig和/dev/poll模式use epoll;#定义每个进程的最大连接数,受系统进程的最大打开文件数量限制worker_connections  1024;
}http{# 配置nginx上传文件最大限制client_max_body_size 50000m;upstream multi_ma {# fair;server 172.17.0.1:10000 ;server 172.17.0.1:10001 ;server 172.17.0.1:10002 ;}server {listen 80;location / {proxy_pass http://multi_ma;}}}

远端使用worker调用。

实操时发现,虽然仙宫云可以给一个80端口,但是似乎也是容器里的虚拟环境,不让再安装包了,所以也没法安装nginx。不过理论上应该可以实现。

最后,还是用类似AutoDL的方式启动3个worker。

两块4090之后,速度明显快多了。
在这里插入图片描述

3.2 获取结果并入库

建立对应的表

# 2 导入包
from Basefuncs import * # 快速载入连接
def make_local_wmongo_connect(server_name):try:tem_w = from_pickle(server_name)print('【Loading cur_w】from pickle')except:w = WMongo('w')tem_w = w.TryConnectionOnceAndForever(server_name =server_name)to_pickle(tem_w, server_name)return tem_wm8_cur_w = make_local_wmongo_connect('m8.24003')# 建立索引
m8_cur_w.set_a_index(tier1='llm', tier2 = 'company_intro', idx_var='pid')
m8_cur_w.set_a_index(tier1='llm', tier2 = 'company_intro', idx_var='company')
m8_cur_w.set_a_index(tier1='llm', tier2 = 'company_intro', idx_var='model_name')

从结果队列里取数,然后入库

# 封装函数
def get_some_batch_updated():credentials = pika.PlainCredentials('xxx', 'xxx')connection = pika.BlockingConnection(pika.ConnectionParameters('IP', port, '/', credentials, heartbeat=600))# 2 迭代的获取数据res_list = []with connection.channel() as channel:for i in range(100):# 声明一个队列channel.queue_declare(queue='ent_intro_result', durable=True)# 从队列中获取一条消息method_frame, header_frame, body = channel.basic_get(queue='ent_intro_result')res_dict = json.loads(body.decode())res_list.append(res_dict)channel.basic_ack(delivery_tag=method_frame.delivery_tag)# 3 拼凑为标准数据框res_df = pd.DataFrame(res_list)# 增加必要的模型字段res_df['model_name'] = 'chatglm2_6b_int4'res_df['pid'] = (res_df['company'] + res_df['model_name']).apply(md5_trans)m8_cur_w.insert_or_update_with_key(tier1 = 'llm', tier2 = 'company_intro', data_listofdict= res_df.to_dict(orient='records') , key_name='pid')connection.close()# 获取并存储100条
get_some_batch_updated()

4 结语

本次完成了:

  • 1 RabbitMQ 和 RabbitAgent的建立。这使得其他机器可以不必要使用端口,非常适合超高计算传输比的任务。
  • 2 将原始数据通过rabbit agent 发布到任务队列
  • 3 将chatglm2-6b部署到算力租用机:测试了主流的三家autodl, anygpu和仙宫云,都是ok的
  • 4 在各算力机上启动worker进行处理
  • 5 将结果获取,然后存在本地的mongo

没能成功完成的实践是在仙宫云使用nginx做负载均衡,简化worker的请求。

结论:用llm来做任务成本还是比较高的。价格折算下来,大约 ¥1/千条。所以,要把大模型用在高价值领域,例如替代人工打标,写函数这些。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/811594.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QA测试开发工程师面试题满分问答11: web前端页面视频组件无法播放如何定位bug

当 web 前端页面的视频组件无法播放时&#xff0c;可以从以下维度进行分析和定位可能的 bug&#xff0c;分析维度包括但不限于&#xff1a;前端功能点、缓存、异常、后端功能点、资源占用、并发、网络等&#xff1a; 前端功能点&#xff1a; HTML5 视频支持&#xff1a;检查视频…

等保测评2.0——网络安全等级保护测评的初步了解

一、什么是网络安全等级保护测评&#xff1f; 二、网络安全等级保护&#xff0c;保护的是什么&#xff1f; 等级保护对象&#xff1a;网络安全等级保护工作直接作用的对象。&#xff08;注&#xff1a;主要包括信息系统、通信网络设施和数据资源等&#xff09; 计算机信息系统…

Qotom Q720G5英特尔赛扬处理器N4000高性价比无风扇迷你电脑5网口软路由防火墙

在数字时代&#xff0c;迷你电脑已经成为高效、灵活的解决方案&#xff0c;无论是个人用户还是企业用户&#xff0c;都能从中受益。Qotom Q720G5 无风扇迷你电脑就是这样一款强大的选择&#xff0c;它不仅可以作为软路由、防火墙和路由器&#xff0c;还有着更多的潜力等待发掘。…

中国手机频段介绍

中国目前有三大运营商&#xff0c;分别是中国移动、中国联通、中国电信&#xff0c;还有一个潜在的运营商中国广电&#xff0c;各家使用的2/3/4G的制式略有不同 中国移动的GSM包括900M和1800M两个频段。 中国移动的4G的TD-LTE包括B34、B38、B39、B40、B41几个频段&#xff0c;…

苹果全力升级:用专注AI的M4芯片彻底改造Mac系列

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

通过Transform与Animation,来探索CSS中的动态视觉效果

在 transform 和 animation 出现之前&#xff0c;前端开发者通常需要编写大量的 JavaScript 代码来实现动态效果。然而&#xff0c;这两个 CSS 属性的引入极大地简化了丰富动效和过渡效果的实现&#xff0c;从而让用户界面更加引人入胜&#xff0c;交互体验更为流畅。本文将深入…

最优算法100例之44-不用加减乘除做加法

专栏主页:计算机专业基础知识总结(适用于期末复习考研刷题求职面试)系列文章https://blog.csdn.net/seeker1994/category_12585732.html 题目描述 不用加减乘除做加法 题解报告 最优解法:使用异或 1)异或是查看两个数哪些二进制位只有一个为1,这些是非进位位,可以直接…

小程序地理位置权限申请+uniapp调用uni.getLocation

文章目录 一、小程序地理位置权限申请二、uniapp调用uni.getLocation 一、小程序地理位置权限申请 需要确保小程序类目已经填写 点击左侧导航栏找到最后的“设置”——“基本设置”——“前往填写” 在开发管理——接口设置——地理位置中可以看到&#xff1a; 即可点击想要申…

智能物联网远传冷水表管理系统

智能物联网远传冷水表管理系统是一种基于物联网技术的先进系统&#xff0c;旨在实现对冷水表的远程监测、数据传输和智能化管理。本文将从系统特点、构成以及带来的效益三个方面展开介绍。 系统特点 1.远程监测&#xff1a;系统可以实现对冷水表数据的远程监测&#xff0c;无…

uni-app实现下拉刷新

业务逻辑如下&#xff1a; 1.在滚动容器中加入refresher-enabled属性&#xff0c;表示为开启下拉刷新 2.监听事件&#xff0c;添加refresherrefresh事件 3.在事件监听函数中加载数据 4.关闭动画&#xff0c;添加refresher-triggered属性&#xff0c;在数据请求前开启刷新动画…

单片机之蓝牙通信

目录 蓝牙介绍 HC05蓝牙模块 HC05参数 HC05引脚 各个引脚功能 HC05模块的作用 工作模式 配置模式 引脚接线 用AT指令进行配置 常用的AT指令 正常模式 测试步骤 烧录的程序 前言&#xff1a; keil文件 蓝牙介绍 蓝牙&#xff1a;Bluetooth&#xff0c;其是低成…

企业航拍VR全景视频展示仿如上门参观

360度VR全景视频因其广阔的视野和身临其境的体验&#xff0c;无论再房产楼盘的精致呈现&#xff0c;旅游景点的全景漫游&#xff0c;还是校园风光的生动展示&#xff0c;都成为企业商户的首选。 360度vr全景视频编辑软件是深圳VR公司华锐视点提供多种常见的三维仿真场景供选择&…

【Python细类】全局日志调试模式

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

goproxy 简单介绍 及一键安装脚本

goproxy 官网 https://goproxy.cn/ GoProxy 是一项用于 Go 模块的高性能代理服务&#xff0c;旨在为 Go 开发人员提供更快速、更可靠的模块下载体验。它提供以下主要功能&#xff1a; 全球分布式代理服务器: GoProxy 在全球多个地区部署了代理服务器&#xff0c;例如拉斯维加…

【电控笔记6】电流回路+延迟效应

问题提出 数字控制系统的delay: 5.4节有介绍T0=0.5TS 低通滤波器的时间常数? 可用示例程序 m2 2 1b 如下图画出开环系统的伯德图进行比较,如图 2-2-4 所示,由于延迟组件会侵蚀系统的相位,因此从图可以看出,加入延迟效应后,q轴电流回路的相位裕度(Phase Margin) 从…

CSS3 平面 2D 变换+CSS3 过渡

个人主页&#xff1a;学习前端的小z 个人专栏&#xff1a;HTML5和CSS3悦读 本专栏旨在分享记录每日学习的前端知识和学习笔记的归纳总结&#xff0c;欢迎大家在评论区交流讨论&#xff01; 文章目录 ✍一、CSS3 平面 2D 变换&#x1f48e;1 坐标轴&#x1f48e;2 transform 语法…

【Hadoop】下载安装及伪分布式集群搭建教程

目录 1.概述 2.环境准备 3.hadoop安装 3.1.下载安装配置 3.2.伪分布式集群 3.3.注意事项 4.Hadoop集群的组成 1.概述 hadoop有三种安装模式 单机模式&#xff0c;只在一台机器上运行&#xff0c;存储用的本地文件系统而不是HDFS。 伪分布式模式&#xff0c;存储采用HD…

HarmonyOS实战开发-录音机、如何实现音频录制和播放的功能

介绍 本示例使用audio相关接口实现音频录制和播放的功能&#xff0c;使用mediaLibrary实现音频文件的管理。 相关概念&#xff1a; AudioRecorder&#xff1a;音频录制的主要工作是捕获音频信号&#xff0c;完成音频编码并保存到文件中&#xff0c;帮助开发者轻松实现音频录…

3d里怎么让模型直接显示材质---模大狮模型网

在3D设计和渲染中&#xff0c;使模型直接显示材质是一个常见但也关键的需求。直接显示材质可以帮助设计师更直观地预览和编辑模型的外观&#xff0c;从而提高工作效率并确保最终效果符合预期。本文将介绍一些方法和技巧&#xff0c;帮助你在3D设计中实现模型直接显示材质的目标…

进口PFA容量瓶高纯透明聚四氟乙烯材质耐强酸碱PFA定容瓶

PFA容量瓶&#xff0c;也叫特氟龙容量瓶&#xff0c;是用于配制标准浓度溶液的实验室器皿&#xff0c;是有着细长颈、梨形肚的耐强腐蚀平底塑料瓶&#xff0c;颈上有标线&#xff0c;可直接配置标准溶液和准确稀释溶液以及制备样品溶液。 因其有着不易碎、材质纯净、化学稳定性…