1. 前言
1.1. 概述
基于Flask、Pika、Multiprocessing、Thread搭建一个架构,完成多线程、多进程工作。具体需求如下:
- 并行计算任务:使用
multiprocessing
模块实现并行计算任务,提高计算效率、计算能力。 - 消息侦听任务:使用
threading
模块完成RabbitMQ消息队列的侦听任务,将接收到的数据放入multiprocessing.Queue
中,以便并行计算任务处理。 - Web服务:使用
Flask
框架实现Web API服务,提供启停消息侦听任务、启停并行计算任务以及动态调整参数的功能。 - 任务交互:通过
multiprocessing.Queue
实现消息侦听任务与并行计算任务之间的资源交互。 - 非阻塞运行:使用
threading
模块非阻塞地运行Flask Web服务。
1.2. 续上一篇《python多线程与多进程开发实践及填坑记(1)》
python多线程与多进程开发实践及填坑记(1)
1.3. 重新启动侦听线程失败
程序没有报错,但是,没有启动侦听服务线程。
@app.route('/startlistening', methods=['GET'])
def start_listening():if stop_event.is_set():stop_event.clear()if not hasattr(app, 'pika_thread') or not app.pika_thread.is_alive():app.pika_thread = Thread(target=consume_from_rabbitmq_and_enqueue, args=(rabbitmq_params, rabbitmq_queue, data_queue))app.pika_thread.start()return jsonify({'status': 'listening'}), 200else:return jsonify({'status': 'already listening'}), 200
2. 原理解析
多线程下的Event(事件)原理与使用是并发编程中的重要概念,它主要用于线程间的同步与通信。下面我将从原理、类型、状态以及使用方法等方面详细介绍Event。
2.1. Event原理
Event(事件)是一个内核对象,用于线程间的同步与通信。它管理着一个全局的内部标志(Flag),这个标志通常有两种状态:已设置(True)和未设置(False)。线程可以通过Event对象来等待某个事件的发生,或者通知其他线程某个事件已经发生。
2.2. Event类型
Event主要分为两种类型:
- 自动重置事件:当事件被触发(即Flag被设置为True)时,会唤醒一个正在等待的线程。随后,事件的Flag会自动重置为False,直到再次被触发。
- 手动重置事件:当事件被触发时,会唤醒所有正在等待的线程。事件的Flag保持为True,直到程序显式地将其重置为False。
2.3. Event状态
Event对象的状态主要由其内部标志(Flag)决定,标志有两种可能的状态:
- True(已设置):表示事件已发生,线程可以继续执行或不再被阻塞。
- False(未设置):表示事件未发生,线程将处于阻塞状态,直到事件被触发。
2.4. Event使用方法
在多线程编程中,Event对象通常通过以下几个方法进行操作:
- 创建Event对象:
import threading
event = threading.Event()
- 设置Event(触发事件):
使用set()方法将Event对象的内部标志设置为True,唤醒所有等待的线程。
event.set()
- 清除Event(重置事件):
使用clear()方法将Event对象的内部标志重置为False,使等待的线程再次进入阻塞状态(对于自动重置事件,这一步是自动的)。
event.clear()
- 等待Event(阻塞线程):
使用wait(timeout=None)方法使当前线程阻塞,直到Event对象的内部标志为True或调用超时。
event.wait() # 可以指定超时时间,如 event.wait(1) 表示等待1秒
- 检查Event状态:
使用is_set()或isSet()(旧式Python中)方法检查Event对象的内部标志是否为True。
if event.is_set(): print("事件已发生")
2.5. 使用场景
Event在多线程编程中有广泛的应用场景,如:
- 生产者-消费者模式:生产者线程在生成数据后触发Event,消费者线程等待Event来接收数据。
- 线程间同步:当多个线程需要按照特定顺序执行时,可以使用Event来控制执行流程。
- 条件等待:在某些条件下,线程需要等待某个事件的发生才能继续执行,这时可以使用Event来实现。
3. 重启线程解决方案
在多线程和多进程编程中,Event 对象是一个重要的同步原语。它允许一个线程向其他线程发送信号,以通知某些事件已经发生。具体到 stop_event 的使用,其主要作用是控制线程的启动和停止。
3.1. 修改代码
最初,是在start_listening中增加延时sleep代码,但是,没有起作用。
def start_listening():if stop_event.is_set():stop_event.clear()time.sleep(3)
有效的是在stop_listening中增加event.clear()代码。
@app.route('/startlistening', methods=['GET'])
def start_listening():if not stop_event.is_set():stop_event.clear()#time.sleep(2)# 此处,可以增加sleep延时启动监听if not stop_event.is_set():#stop_event.clear() if not hasattr(app,'pika_thread') or not app.pika_thread.is_alive():app.pika_thread = Thread(target = consume_from_rabbitmq_and_enqueue, args = (rabbitmq_params, rabbitmq_queue, data_queue))app.pika_thread.start()return jsonify({'status':'listening'}), 200else:return jsonify({'status':'already listening'}), 200else:stop_event.clear()return jsonify({'status':'error','message':'Stop event is set.'}), 400@app.route('/stoplistening', methods=['GET'])
def stop_listening():if hasattr(app,'pika_thread') and app.pika_thread.is_alive():stop_event.set()app.pika_thread.join() # 等待线程结束del app.pika_thread # 删除资源,确保重启成功time.sleep(2) stop_event.clear() # 新增起作用的代码!return jsonify({'status':'stopping'}), 200 else:return jsonify({'status':'not running'}), 400
3.2. 解释说明
- stop_event.clear() 在 start_listening 中的作用
在 start_listening 中调用 stop_event.clear() 确保了事件的内部标志被设置为 False。这意味着:
- 重新启动侦听线程时,如果事件的内部标志是 True,则不会启动新的线程。
- 通过 clear() 将标志设置为 False,确保新的线程可以启动,并开始侦听 RabbitMQ 消息。
- stop_event.clear() 在 stop_listening 中的作用
- 在 stop_listening 中调用 stop_event.clear() 确保了事件的内部标志被重置为 False,这样下一次调用 start_listening 时,不会被之前的停止状态影响。
- clear() 的调用位置在 time.sleep(2) 之后,确保了有足够的时间让线程完全停止并释放资源。
- 为什么只增加 sleep 不足以解决问题
仅仅增加 sleep 只是延迟了操作,并没有改变 stop_event 的状态。stop_event 仍然是设置状态(True),这会导致 start_listening 无法通过 if not stop_event.is_set() 的检查,从而不会启动新的线程。
3.3. 小结
在多线程控制中,使用 Event 对象来管理线程的启动和停止非常有效。set() 和 clear() 方法可以控制线程的执行和等待状态。正确地使用 stop_event.clear() 确保了你的应用程序在停止线程后可以重新启动新的线程,而不受之前的停止状态影响。这是你代码中的关键步骤,确保了线程控制逻辑的正确性。