multiprocessing多进程计算及与rabbitmq消息通讯实践

1. 需求与设计

我所设计的计算服务旨在满足多个客户对复杂计算任务的需求。由于这些计算任务通常耗时较长且资源消耗较大,为了优化客户体验并减少等待时间,我采取了并行计算的策略来显著提升计算效率。

为实现这一目标,我计划利用Python的multiprocessing库来构建一个高效的多进程计算服务。同时,为了与外部的RabbitMQ消息队列系统无缝集成,我选择了pika库作为与RabbitMQ通信的桥梁。

在计算服务的架构中,我设计了生产者和消费者两个核心角色。生产者负责监听RabbitMQ队列中的消息,每当接收到新的消息时,它将从消息中提取出待处理的数据,并将其放入multiprocessing.Queue中等待处理。这个过程充分利用了multiprocessing.Queue提供的进程间通信机制,确保数据的可靠传递和同步。

与此同时,消费者进程则负责从multiprocessing.Queue中取出待处理的数据,进行实际的计算工作。这些计算任务可能需要大约一个小时的时间来完成,取决于数据的复杂性和计算资源的分配情况。一旦计算完成,消费者进程将把结果重新封装成消息,并通过pika库发送回RabbitMQ的指定队列中,以便客户或其他系统能够获取到这些结果。

通过这种设计,我的计算服务能够同时处理多个客户的计算请求,并通过并行计算的方式提高整体的处理效率。同时,RabbitMQ的引入使得服务的可扩展性和可靠性得到了极大的提升,即使在面对高并发或系统故障的情况下,也能保证数据的完整性和服务的稳定性。

在这里插入图片描述

2. multiprocessing多进行模块

multiprocessing模块是Python标准库的一部分,提供了一种基于多进程的并行计算方法。通过使用multiprocessing模块,可以利用多个CPU核心来并行处理任务,从而提高程序的执行效率。与多线程相比,多进程避免了全局解释器锁(GIL)的限制,因此在CPU密集型任务中表现更优。

2.1. multiprocessing模块的核心概念和功能

  • 进程(Process):通过multiprocessing.Process类可以创建和管理独立的进程。每个进程都有自己的内存空间,互不干扰。

  • 进程间通信(IPC)multiprocessing提供了多种进程间通信的机制,如QueuePipeValueArray。在代码中,我们使用multiprocessing.Queue来在主进程和工作进程之间传递数据。

  • 同步(Synchronization)multiprocessing模块还提供了同步原语,如LockSemaphoreEventConditionBarrier,用于协调进程之间的操作。

2.2. 代码中的multiprocessing使用说明

以下是代码中使用multiprocessing的主要部分的解释:

创建和管理进程

# 创建工作进程
processes = []
for _ in range(3):p = multiprocessing.Process(target=worker, args=(data_queue, rabbitmq_params, target_queue))p.start()processes.append(p)logger.info(f'Process started with PID: {p.pid}')
  • multiprocessing.Process类用于创建一个新的进程。
  • target参数指定了该进程将运行的目标函数,这里是worker函数。
  • args参数提供了传递给目标函数的参数。
  • p.start()方法启动了进程。

进程间通信

# 创建一个multiprocessing.Queue用于进程间通信
data_queue = multiprocessing.Queue()
  • multiprocessing.Queue用于在主进程和工作进程之间传递数据。它是线程和进程安全的队列,提供了先进先出(FIFO)的数据传递方式。

工作进程函数

def worker(data_queue, rabbitmq_params, target_queue):while True:try:data = data_queue.get()if data is None:breakresult = compute_result(data)try:logger.info('Worker started')connection = pika.BlockingConnection(rabbitmq_params)channel = connection.channel()except pika.exceptions.AMQPError as e:logger.error(f"Error connecting to RabbitMQ in worker: {e}")return            send_result_to_rabbitmq(channel, target_queue, result)except Exception as e:logger.error(f"An error occurred in worker: {e}")try:channel.close()connection.close()except pika.exceptions.AMQPError as e:logger.error(f"Error closing RabbitMQ connection in worker: {e}")logger.info('Worker finished')
  • worker函数从data_queue中获取数据进行处理。如果队列为空,data_queue.get()将阻塞直到有数据可用。
  • 处理完成后,结果通过send_result_to_rabbitmq函数发送到RabbitMQ。

终止工作进程

# 等待所有工作进程结束
for p in processes:data_queue.put(None)  # 发送None信号以终止工作进程
for p in processes:p.join()
  • data_queue发送None信号,通知工作进程终止。
  • 使用p.join()方法等待每个进程完成。这是一个阻塞调用,直到对应的进程终止。

2.3. 小结

  • multiprocessing模块通过创建独立的进程,利用多核CPU的能力并行处理任务。
  • multiprocessing.Queue用于进程间通信,确保数据安全地在进程间传递。
  • 通过错误处理和日志记录,提高了程序的健壮性和可维护性。

这使得程序能够在多核环境中高效运行,并能够处理各种异常情况,确保程序的稳定性和可靠性。

3. 实践代码

import pika  
import multiprocessing  
import time 
from loguru import logger# 假设这是你的计算函数  
def compute_result(data):  # 模拟计算过程  time.sleep(3 * 60)  # 假设需要3分钟,换成随机  return f"Result for {data}"  # 发送结果到RabbitMQ的函数  
def send_result_to_rabbitmq(channel, queue_name, result):  try:channel.queue_declare(queue=queue_name, durable=True)channel.basic_publish(exchange='',routing_key=queue_name,body=result,properties=pika.BasicProperties(delivery_mode=2,  # make message persistent))logger.info(f"发送结果消息:{result} 到RabbitMQ")except pika.exceptions.AMQPError as e:logger.error(f"Error sending result to RabbitMQ: {e}")raise # 从RabbitMQ接收数据并放入队列的函数(生产者)  
def consume_from_rabbitmq_and_enqueue(rabbitmq_connection, rabbitmq_queue, data_queue):  try:channel = rabbitmq_connection.channel()channel.queue_declare(queue=rabbitmq_queue, durable=True)def callback(ch, method, properties, body):try:data_queue.put(body.decode('utf-8'))logger.info(f"接收到消息:{body.decode('utf-8')}")except Exception as e:logger.error(f"Error putting message into data_queue: {e}")channel.basic_consume(queue=rabbitmq_queue, on_message_callback=callback, auto_ack=True)channel.start_consuming()except pika.exceptions.AMQPError as e:logger.error(f"Error consuming from RabbitMQ: {e}")raise# 工作进程函数  
def worker(data_queue,rabbitmq_params, target_queue):  # 由于计算时间长,连接很容易被断开#try:#    logger.info('Worker started')#    workerconnection = pika.BlockingConnection(rabbitmq_params)#    channel = workerconnection.channel()#    channel.queue_declare(queue = target_queue, durable=True)#except pika.exceptions.AMQPError as e:#    logger.error(f"Error connecting to RabbitMQ in worker: {e}")#    returnwhile True:  try:  # 从Queue中获取数据,如果队列为空,则阻塞等待  data = data_queue.get() print(f'data_queue.get() is {data}') if data is None:  # 收到None信号,表示应该退出  break  # 计算结果  result = compute_result(data)  try:logger.info('Worker started')workerconnection = pika.BlockingConnection(rabbitmq_params)channel = workerconnection.channel()channel.queue_declare(queue = target_queue, durable=True)except pika.exceptions.AMQPError as e:logger.error(f"Error connecting to RabbitMQ in worker: {e}")return                          # 发送结果到RabbitMQ  send_result_to_rabbitmq(channel, target_queue, result)  except Exception as e:  print(f"An error occurred: {e}")  try:channel.close()connection.close()except pika.exceptions.AMQPError as e:logger.error(f"Error closing RabbitMQ connection in worker: {e}")logger.info('Worker finished')# 主程序  
if __name__ == "__main__":  rabbitmq_queue = 'hello_world'target_queue = 'target_station_response_queue'# 设置RabbitMQ连接和队列  credentials = pika.PlainCredentials('rabbit', '****')  # mq用户名和密码rabbitmq_params = pika.ConnectionParameters('192.168.*.*',port = 5671,virtual_host = '/dev',credentials = credentials)# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。try:connection = pika.BlockingConnection(rabbitmq_params)except pika.exceptions.AMQPError as e:logger.error(f"Error connecting to RabbitMQ in main process: {e}")exit(1)print(' [*] Waiting for messages. To exit press CTRL+C')      # 创建一个multiprocessing.Queue用于进程间通信  data_queue = multiprocessing.Queue()  # 创建工作进程  processes = []  for _ in range(3):  p = multiprocessing.Process(target=worker, args=(data_queue, rabbitmq_params, target_queue))  p.start()  processes.append(p) print(f'process id = {p.pid}') # 假设这里是生产数据到Queue的代码(这里用模拟数据代替) channel = connection.channel()for i in range(10):  # 发送10个模拟数据  send_result_to_rabbitmq(channel, rabbitmq_queue, str(i))time.sleep(2)  # 模拟生产数据的间隔  '''# 当所有数据都生产完毕后,发送None信号给所有工作进程以结束它们  for _ in range(3):  data_queue.put(None)  '''try:consume_from_rabbitmq_and_enqueue(connection, rabbitmq_queue, data_queue)except Exception as e:logger.error(f"Error in consuming from RabbitMQ and enqueuing: {e}")      # 等待所有工作进程结束for p in processes:data_queue.put(None)  # 发送None信号以终止工作进程for p in processes:p.join()try:connection.close()except pika.exceptions.AMQPError as e:logger.error(f"Error closing RabbitMQ connection in main process: {e}")logger.info("All processes have finished.")

4. 编者按

在当今日益复杂的数据处理场景中,高效、可靠的计算服务对于企业和个人用户而言都至关重要。特别是在需要处理大量计算任务,且这些任务耗时较长、资源消耗较大的情况下,如何优化计算流程、减少用户等待时间,成为了我们必须面对的挑战。

本博客详细探讨了如何利用Python的multiprocessing库和pika库构建一个高效、可扩展的多进程计算服务,并通过RabbitMQ实现与外部系统的消息通讯。这一实践不仅解决了计算资源瓶颈问题,还显著提升了服务的整体性能和可靠性。

在整个实践过程中,我们特别感谢“文言一心”提供的指导和帮助。文言一心能够根据我们的需求快速编写出程序框架,大大提高了我们的工作效率。同时,其丰富的实践经验和专业知识也为我们提供了宝贵的参考和借鉴。同时,也感谢ChatGPT验证上述方案和专业指导。

通过本博客的分享,我们希望能够为读者提供一种高效、可靠的多进程计算服务实现方案,并希望能够帮助读者更好地理解和应用Python的multiprocessing库和pika库。在未来的数据处理和计算服务中,我们相信这一实践将发挥越来越重要的作用。

欢迎反馈。

参考:

[1]. ithicker. 多进程(多核运算)Multiprocessing. CSDN博客. 2021.02
[2]. 擒贼先擒王. Python 多进程:multiprocessing、aiomultiprocess(异步多进程). CSDN博客. 2024.05

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/32065.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Java实训中心管理系统设计和实现(源码+LW+调试文档+讲解等)

💗博主介绍:✌全网粉丝10W,CSDN作者、博客专家、全栈领域优质创作者,博客之星、平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌💗 🌟文末获取源码数据库🌟 感兴趣的可以先收藏起来,…

碳化硅陶瓷膜的生产工艺和应用

一、生产工艺 碳化硅陶瓷膜的生产工艺多样,其中浸渍提拉法和喷涂法为两大主流技术。 浸渍提拉法 浸渍提拉法是一种广泛应用的制备方法。其过程主要包括:先将陶瓷颗粒或者聚合物前体分散在水或有机溶剂中,形成均质稳定的制膜液。随后&#xff…

Jenkins macos 下 failed to create dmg 操作不被允许hdiutil: create failed - 操作不被允许?

解决方案: 打开设置,选择“隐私与安全”,选择“完全磁盘访问权限”,点击“”,选择jenkins的路径并添加。 同理,添加java的访问权限。

Python14 面向对象编程

1.什么是面向对象编程OOP Python的面向对象编程(Object-Oriented Programming,简称OOP)是一种编程范式,它使用“对象”来设计应用程序和计算机程序。这些对象由数据和能够操作这些数据的方法组成。面向对象编程的主要目标是提高软…

Webpack4从入门到精通以及和webpack5对比_webpack现在用的是哪个版本

3.1 打包样式资源css-loader、style-loader… {// 匹配哪些文件test: /\.less$/,// 使用哪些loader进行处理use: [// use数组中loader执行顺序:从右到左,从下到上,依次执行(先执行css-loader)// style-loader:创建style标签&#…

【C++】一个极简但完整的C++程序

一、一个极简但完整的C程序 我们编写程序是为了解决问题和任务的。 1、任务: 某个书店将每本售出的图书的书名和出版社,输入到一个文件中,这些信息以书售出的时间顺序输入,每两周店主会手工计算每本书的销售量、以及每个出版社的…

Vue74-路由传参2

一、$route中的params参数 二、在配置路由的index.js文件中&#xff0c;声明传参 占位符用的什么名字&#xff0c;params里面的key就是什么。 三、<router-link>标签中传参 3-1、to字符串写法 3-2、to的对象写法 注意&#xff1a;若是用params携带参数&#xff0c;不…

mysql的安装以及分享navicat for MySQL

前言 根据网上分享的安装方法以及自己遇到的问题解决方法 一、mysql是什么&#xff1f; mysql 是一个开放源码的小型关联式数据库管理系统 二、安装过程 1.下载安装包 下载地址&#xff1a;MySQL :: Download MySQL Community Server 跳过直接下载&#xff0c;解压即可 …

DPDK的Cache预取和Cache一致性

1.什么是Cache预取 众所周知&#xff0c;CPU访问Cache中的数据是比访问内存中的数据是要快的&#xff0c;而因为程序都有时间局部性和空间局部性&#xff0c;时间局部性简单来说就是某一条或几条指令在一段时间内会被CPU多次执行&#xff1b;空间局部性简单来说就是某一段数据块…

五十五、openlayers官网示例Loading Spinner解析——给地图添加loading效果,瓦片图层加载时等待效果

官网demo地址&#xff1a; Loading Spinner 这篇介绍了一个非常简单的loading效果 利用地图的loadstart和loadend事件&#xff0c;动态的添加和删除class名。 map.on("loadstart", function () {map.getTargetElement().classList.add("spinner");});map…

Vue72-路由传参1

一、需求 点击哪个消息&#xff0c;就展示哪个消息的详情 这是一个三级路由&#xff01; 给路由组件&#xff1a;detail.vue传递消息数据。 二、代码步骤 2-1、编写路由组件 从$route.query属性里面获取传参 2-2、编写路由规则 2-3、编写路由标签&#xff0c;传参 1、to的字…

Ncorr使用过程的问题解答

问题系列 文章目录 问题系列前言一、如何更改单位&#xff1f;情景&#xff1a;DIC Analysis 二、拉格兰日和欧拉绘图的区别直观 三、控制图像中的显示条上下界限问题展示&#xff1a;解决方案&#xff1a; 更新动态 前言 主要用于记录使用过程中出现的相关问题。 一、如何更改…

数据结构:为什么说链表是顺序表的升级版(c语言实现)

前言&#xff1a; 我们在之前的几篇文章中详细的讲解了顺序表的特点&#xff0c;增删改查操作和动态顺序表的优点&#xff0c;并使用顺序表的底层结构实现了通讯录项目&#xff0c;似乎顺序表是一个非常完美的数据结构&#xff0c;它可以实现按照需求实现增删查改&#xff0c;对…

做好海外ASO优化的7大核心要素你了解几个?

海外App进行ASO优化时&#xff0c;需要综合考虑多个方面以确保应用在应用商店中获得更高的曝光率和下载量。以下是一些关键的ASO优化步骤&#xff0c;结合参考文章中的相关信息进行详细阐述&#xff1a; 1.关键词优化 调研目标市场的用户行为和检索习惯&#xff0c;挖掘与应用…

锂磷硫(LPS)属于硫化物固态电解质 Li7P3S11是代表性产品

锂磷硫&#xff08;LPS&#xff09;属于硫化物固态电解质 Li7P3S11是代表性产品 锂磷硫&#xff08;LPS&#xff09;&#xff0c;为非晶态材料&#xff0c;是硫化物固态电解质代表性产品之一&#xff0c;具有热稳定性好、成本较低等优点&#xff0c;在固态电解质中离子电导率较…

【Deep Learning】Meta-Learning:训练训练神经网络的神经网络

元学习&#xff1a;训练训练神经网络的神经网络 本文基于清华大学《深度学习》第12节《Beyond Supervised Learning》的内容撰写&#xff0c;既是课堂笔记&#xff0c;亦是作者的一些理解。 1 Meta-Learning 在经典监督学习中&#xff0c;给定训练数据 { ( x i , y i ) } i \{…

使用Spring Boot实现用户认证和授权

文章目录 引言第一章 Spring Boot概述1.1 什么是Spring Boot1.2 Spring Boot的主要特性 第二章 用户认证和授权基础知识2.1 用户认证2.2 用户授权2.3 Spring Security概述 第三章 项目初始化第四章 实现用户认证和授权4.1 定义用户实体类和角色实体类4.2 创建Repository接口4.3…

IntelliJ IDE 插件开发 | (十)主题插件开发入门

系列文章 本系列文章已收录到专栏&#xff0c;交流群号&#xff1a;689220994&#xff0c;也可点击链接加入。 前言 在前面的章节中&#xff0c;我们介绍的都是功能性插件的开发内容&#xff0c;本文则会介绍一下主题类插件的开发方式。不过本文也只是带大家入个门&#xff…

靠3个字寻求机会,情商不够,别勉强自己

之前我分享了一篇文章寻求一个自由职业的前端伙伴&#xff0c;吸引了好几位朋友来咨询合作&#xff0c;中间出现了不少插曲&#xff0c;好在结果是令人满意的。 作为一名初次创业者&#xff0c;我承认很多地方做的不是那么到位&#xff0c;比如招聘合作伙伴&#xff0c;理想的状…

LLM2Vec论文阅读笔记

这是篇LLM论文&#xff0c;用decoder-like的LLM去提取embedding文章认为&#xff0c;decoder-like的LLM在text embedding task表现不优的一大原因就是其casual attention mechanism&#xff0c;其实就是mask的问题。所以只要对现有的decoder-only LLM进行如下三步改进&#xff…