python多线程与多进程开发实践及填坑记(3)

1. 前言

1.1. 概述

基于Flask、Pika、Multiprocessing、Thread搭建一个架构,完成多线程、多进程工作。具体需求如下:

  1. 并行计算任务:使用multiprocessing模块实现并行计算任务,提高计算效率、计算能力。
  2. 消息侦听任务:使用threading模块完成RabbitMQ消息队列的侦听任务,将接收到的数据放入multiprocessing.Queue中,以便并行计算任务处理。
  3. Web服务:使用Flask框架实现Web API服务,提供启停消息侦听任务、启停并行计算任务以及动态调整参数的功能。
  4. 任务交互:通过multiprocessing.Queue实现消息侦听任务与并行计算任务之间的资源交互。
  5. 非阻塞运行:使用threading模块非阻塞地运行Flask Web服务。

1.2. 续上一篇《python多线程与多进程开发实践及填坑记(1)》

python多线程与多进程开发实践及填坑记(1)

1.3. 重新启动工作进程报错

AttributeError: Can’t get attribute ‘worker’ on <module ‘main’ (built-in)>

程序没有报错,但是,没有启动侦听服务线程。

@app.route('/startworking', methods=['GET'])
def start_worker():if len(processes) == 0: for _ in range(3):  p = multiprocessing.Process(target=worker, args=(data_queue, rabbitmq_params, target_exchange, target_queue, routing_key, pso_params))  p.start()  processes.append(p)   print(f'process id = {p.pid}')return jsonify({'status': 'started working'}), 200else:return jsonify({'status': 'already working'}), 202

2. 原理解析

这里遇到的问题是由 Python 的 multiprocessing 模块在 Windows 上的行为引起的。具体来说,multiprocessing 模块在 Windows 上使用“spawn”启动方法,而不是“fork”,这意味着每个子进程需要能够导入主模块中的所有内容。如果某些对象不能被 pickle 化(例如局部函数),将会导致你看到的 AttributeError 错误。

为了解决这个问题,我们需要确保 worker 函数在主模块的全局命名空间中可用,将 worker 函数和其他依赖函数放到一个单独的模块中,然后在主模块中导入它们。

3. 拆分出代码

import pika
import json
from loguru import logger# 假设这是你的计算函数  
def compute_result(data, pso_params):  return {"result": data}# 发送结果到RabbitMQ的函数  
def send_result_to_rabbitmq(channel, exchange_name, queue_name, routing_key, result):  # 省略# 工作进程函数  
def worker(data_queue, rabbitmq_params, target_exchange, target_queue, routing_key, pso_params): logger.info('Worker started') # 省略logger.info('Worker finished')

关键改动:

  • 将 worker 函数和相关依赖函数移到 workers.py 模块中。
  • 在主模块中导入 worker 函数。
  • 保持 data_queue 的初始化和使用不变。
  • 这样可以确保在多进程环境中,每个子进程都可以正确地导入 worker 函数及其依赖函数。

4. python多线程与多进程及RabbitMQ程序架构

4.1. 模块划分

  1. 主模块 (main.py): 启动Flask服务,管理消息侦听任务和并行计算任务。
  2. Worker模块 (workers.py): 定义并行计算任务及其辅助函数。
  3. Utils模块 (utils.py): 定义RabbitMQ相关的辅助函数。

4.2. 代码实现

  1. 主模块

文件名称:main.py

from flask import Flask, jsonify
from threading import Thread, Event
import multiprocessing
import pika
import logging
from workers import worker
from utils import consume_from_rabbitmq_and_enqueue# 配置日志记录
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)# 创建一个事件来控制侦听
stop_event = Event()# 定义web服务
app = Flask(__name__)@app.route('/startlistening', methods=['GET'])
def start_listening():if stop_event.is_set():stop_event.clear()if not hasattr(app, 'pika_thread') or not app.pika_thread.is_alive():app.pika_thread = Thread(target=consume_from_rabbitmq_and_enqueue, args=(rabbitmq_params, rabbitmq_queue, data_queue, stop_event))app.pika_thread.start()return jsonify({'status': 'listening'}), 200else:return jsonify({'status': 'already listening'}), 200@app.route('/stoplistening', methods=['GET'])
def stop_listening():if hasattr(app, 'pika_thread') and app.pika_thread.is_alive():stop_event.set()app.pika_thread.join()del app.pika_threadreturn jsonify({'status': 'stopped'}), 200  else:return jsonify({'status': 'not running'}), 400@app.route('/startworking', methods=['GET'])
def start_worker():if len(processes) == 0: for _ in range(3):  p = multiprocessing.Process(target=worker, args=(data_queue, rabbitmq_params, target_exchange, target_queue, routing_key, pso_params))  p.start()  processes.append(p)   print(f'process id = {p.pid}')return jsonify({'status': 'started working'}), 200else:return jsonify({'status': 'already working'}), 202@app.route('/stopworking', methods=['GET'])
def stop_worker():for p in processes:data_queue.put(None)for p in processes:p.join()processes.clear() return jsonify({'status': 'stopped working'}), 200if __name__ == "__main__":rabbitmq_queue = 'energyStorageStrategy.queue'target_queue = 'energyStorageStrategy.queue.typc-fpd-tysh'target_exchange = 'energyStorageStrategy.direct'routing_key = 'typc-fpd-tysh'pso_params = {}  # 假设你的PSO参数credentials = pika.PlainCredentials('rabbit', 'rabbit****')  # mq用户名和密码rabbitmq_params = pika.ConnectionParameters('192.168.*.*', port=5671, virtual_host='/typc-fpd-dev', credentials=credentials)# 创建一个multiprocessing.Queue用于进程间通信  data_queue = multiprocessing.Queue()  # 创建工作进程列表processes = [] print(' [*] Waiting for messages. To exit press CTRL+C')flask_thread = Thread(target=lambda: app.run(host='0.0.0.0', port=5002, debug=True))flask_thread.start()with app.app_context():start_worker()start_listening()
  1. Worker模块

文件名称:workers.py

import pika
import json
from loguru import logger# 假设这是你的计算函数  
def compute_result(data, pso_params):  return {"result": data}# 发送结果到RabbitMQ的函数  
def send_result_to_rabbitmq(channel, exchange_name, queue_name, routing_key, result):  # 省略# 工作进程函数  
def worker(data_queue, rabbitmq_params, target_exchange, target_queue, routing_key, pso_params): logger.info('Worker started') # 省略logger.info('Worker finished')
  1. Utils模块

文件名称:utils.py

import pikadef consume_from_rabbitmq_and_enqueue(rabbitmq_params, rabbitmq_queue, data_queue, stop_event):# 略

5. 总结

  • 模块化:将不同的功能模块化,便于维护和扩展。
  • 多进程与多线程结合:使用multiprocessing实现并行计算任务,使用threading实现RabbitMQ消息侦听和Flask Web服务的非阻塞运行。
  • 进程间通信:通过multiprocessing.Queue实现消息侦听任务与并行计算任务之间的资源交互。
  • 事件控制:通过threading.Event控制消息侦听任务的启停。

这种架构设计能够满足需求,并且具有较好的扩展性和可维护性。如果有更多具体的需求或优化,可以在此基础上进一步完善。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/41813.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

BufferReader/BufferWriter使用时出现的问题

项目场景&#xff1a; 在一个文件中有一些数据&#xff0c;需要读取出来并替换成其他字符再写回文件中&#xff0c;需要用Buffer流。 问题描述 文件中的数据丢失&#xff0c;并且在读取前就为空&#xff0c;读取不到数据。 问题代码&#xff1a; File f new File("D:\\…

Python排序,你用对了吗?一文教你sorted和sort的正确姿势!

目录 1、sorted基础用法 🍏 1.1 列表排序入门 1.2 自定义排序规则 1.3 排序稳定性和key函数 2、sort内置方法操作 🔍 2.1 直接修改原列表 2.2 sort高级技巧与性能考量 2.3 案例:数据预处理实战 2.4 高级用法:reverse与cmp_to_key 3、应对复杂数据结构 🌐 3.1…

递归与分治算法-以高校学生就业管理系统为例

1.递归算法介绍 递归是一种在问题解决过程中自我调用的算法技术。一个递归函数会调用自身来解决问题的一个更小的部分。递归算法通常具有以下特点&#xff1a; 基本情形&#xff08;Base Case&#xff09;&#xff1a;递归必须有一个或多个基本情形&#xff0c;这样递归调用才…

Yolo系列再次更新——清华发布Yolov10端到端实时对象检测模型

前期我们刚介绍过Yolo系列模型,还以为Yolov9刚刚发布,也许今年不会再有什么更新。但是没有想到打脸如此之快,Yolov10端到端实时对象检测模型强势回归发布。Yolov10端到端实时对象检测 YOLOv10 是清华大学研究人员在YOLO软件包的基础上,引入了一种新的实时目标检测方法,解决…

python excel openpyxl

python excel LTS 在开始之前&#xff0c;确保已经安装了 Python 和所需的库。 主要使用以下库&#xff1a; openpyxl&#xff1a;用于读取和写入 Excel 文件。 pandas&#xff1a;用于数据处理和分析。 xlwings&#xff1a;用于将 Python 与 Excel 连接&#xff0c;实现双向…

从vs中删除自带的Microsoft Git Provider

vs自带的Git Provider非常不好用&#xff0c;每一次在Tools里面把Source Control调节成None&#xff0c; 下一次打开&#xff0c;又是Git Provider Make sure Visual Studio is closedOpen regeditNavigate to HKEY_CURRENT_USER\Software\Microsoft\VisualStudio\12.0_Confi…

HTTP协议格式

目录 正文&#xff1a; 1.概述 2.主要特点 3.请求协议格式 4.响应协议格式 5.响应状态码 总结&#xff1a; 正文&#xff1a; 1.概述 HTTP 协议是用于传输超文本数据&#xff08;如 HTML&#xff09;的应用层协议&#xff0c;它建立在传输层协议 TCP/IP 之上。当我们在…

视频参考帧和重构帧复用

1、 视频编码中的参考帧和重构帧 从下图的编码框架可以看出&#xff0c;每编码一帧需要先使用当前帧CU(n)减去当前帧的参考帧CU&#xff08;n&#xff09;得到残差。同时&#xff0c;需要将当前帧的重构帧CU*&#xff08;n&#xff09;输出&#xff0c;然后再读取重构帧进行预测…

js逆向抠js要点解析与案例分享

JavaScript&#xff08;JS&#xff09;逆向工程是一种技术&#xff0c;用于分析和理解JS代码的功能和行为&#xff0c;尤其是在源代码不可用或被混淆的情况下。逆向JS代码可以帮助开发者理解第三方库的工作机制&#xff0c;或者在调试和优化过程中定位问题。 要点一&#xff1…

七、MyBatis-Plus高级用法:最优化持久层开发-个人版

七、MyBatis-Plus高级用法&#xff1a;最优化持久层开发 目录 文章目录 七、MyBatis-Plus高级用法&#xff1a;最优化持久层开发目录 一、MyBatis-Plus快速入门1.1 简介1.2 快速入门回顾复习 二、MyBatis-Plus核心功能2.1 基于Mapper接口CRUDInsert方法Delete方法Update方法Se…

PyQt5中如何实现指示灯点亮和指示灯熄灭功能

一般上位机界面都会涉及指示灯点亮和指示灯熄灭功能&#xff0c;从网上下载该功能的上位机界面&#xff0c;学习如何使用PyQt5搭建具备指示灯点亮和指示灯熄灭效果的界面。 1. 上位机界面的效果展示 使用PyQt5实现以下界面&#xff0c;界面效果如下&#xff0c;界面图片是从网…

基于SpringBoot的招聘信息管理系统的详细设计和实现(源码+lw+部署文档+讲解等,欢迎咨询我!!)

文章目录 目录 文章目录 详细视频展示&#xff1a; 系统具体实现效果&#xff08;看看我的实力&#xff09; 技术栈&#xff08;详细的描述提供给同学思路参考&#xff09; 2.1 Java语言介绍 2.2 B/S架构 2.3 MySQL 数据库介绍 2.4 MySQL环境配置 2.5 SpringBoot框…

C++之static关键字

文章目录 前提正文多重定义extern关键字使用staticstatic 全局变量(在.cpp文件中定义)static变量存放在哪里static变量可不可以放在.h文件中 static 函数static局部变量static 成员变量static 成员函数 总结参考链接 前提 好吧&#xff0c;八股&#xff0c;我又回来了。这次想…

[图解]企业应用架构模式2024新译本讲解23-标识映射2

1 00:00:00,950 --> 00:00:02,890 好&#xff0c;我们往下走 2 00:00:04,140 --> 00:00:04,650 一样的 3 00:00:04,660 --> 00:00:07,170 这前面也见过了&#xff0c;定义一个对象数组 4 00:00:07,870 --> 00:00:12,820 数组的长度就是字段的数量&#xff0c;4个…

中值滤波法

中值滤波法 中值滤波法:连续采样N次(N取奇数),把N次采样值按大小排列,取中间值为本次有效值。 优点:能有效克服因偶然因素引起的波动干扰;对温度、液位的变化缓慢的被测参数有良好的滤波效果。 缺点:对流量、速度等快速变化的参数不宜。 #include <stdio.h> #i…

一.1.(3)半导体二极管基本电路的分析方法及常见应用电路

1.二极管基本电路的分析方法 先标正负极&#xff0c;再看是否理想二极管 将二极管视为断路&#xff0c;求两端电压 两端电压均大于导通电压&#xff0c;压差大的先导通&#xff08;由于电源不是完全的阶跃&#xff0c;而是有一个电压爬升的过程&#xff09; 2.常见应用电路 1.求…

【redis】redis知识点学习目录整理及简介

1、Redis概述 作者往期博文链接&#xff1a; 1、【redis】redis概述-CSDN博客 2、【redis】redis经典面试题20连问-CSDN博客 Redis定义&#xff1a;Redis是一个开源的、高性能的、基于内存运行的、非关系型的键值对NoSQL数据库。特点&#xff1a; 数据存储在内存中&#xf…

centos修改时间:系统时间、硬件时间

在CentOS上&#xff0c;修改时间可以通过以下步骤进行&#xff0c;涵盖系统时间和硬件时间&#xff08;RTC&#xff1a;Real-Time Clock&#xff09;。 系统时间是操作系统内核维护的时间硬件时间是系统主板上的时钟芯片维护的时间。 1. 修改系统时间 使用 date 命令来修改系…

c语言------------------分支结构

#语句 ## 空语句 c语言中最简单的语句就是空语句&#xff0c;其本身只包含一个分号。空语句本身不执行任何任务&#xff0c;但是有时也是有用的 ## 表达式语句 c语言中的语句本质上就是程序员的某些操作意图的体现。C语言中的单句是以分号结尾&#xff0c; 如&#xff1a…

日期选取限制日期范围antdesign vue

限制选取的日期范围 效果图 <a-date-pickerv-model"dateTime"format"YYYY-MM-DD":disabled-date"disabledDate"valueFormat"YYYY-MM-DD"placeholder"请选择日期"allowClear />methods:{//回放日期选取范围限制&…