文章目录
- 1. 背景&目标
- 2. show me the code
- 3.知识点
- 4. 总结
1. 背景&目标
背景:一个任务可分为多个阶段(各个阶段非CPU密集型任务,而是属于IO密集型任务),希望每个阶段能够交给各自的线程去执行。
目标:构建支持多并发的稳定的程序高效处理上述问题的程序,要求能够灵活设置并发。
2. show me the code
假设任务分为三个阶段,分别是download、resize和upload,代码采用管道将三个阶段进行拼接。
三个阶段的处理简化为三个函数,处理结果通过Queue传到下个阶段,各个阶段可以创建不同的线程去消费Queue中的数据,直到所有数据处理完成。代码如下:
from threading import Thread
from queue import Queue
import timemy_queue = Queue()class ClosableQueue(Queue):SENTINEL = object()def close(self):self.put(self.SENTINEL)def __iter__(self):while True:item = self.get()try:if item is self.SENTINEL:# 退出机制returnyield itemfinally:self.task_done()class Worker(Thread):def __init__(self, func, in_queue, out_queue):super().__init__()self.func = funcself.in_queue = in_queueself.out_queue = out_queuedef run(self):# 确保线程退出的机制:在生产者-消费者模型中,通常需要使用特殊标志(如 SENTINEL)通知消费者线程结束循环。for item in self.in_queue:result = self.func(item)self.out_queue.put(result)def download(obj):print(f'[download] id= {id(obj)}')time.sleep(0.1)return objdef resize(obj):print(f'[resize] id= {id(obj)}')time.sleep(0.01)return objdef upload(obj):print(f'[upload] id= {id(obj)}')time.sleep(1)return objdef start_threads(count, *args):threads = [Worker(*args) for _ in range(count)]for thread in threads:thread.start()return threadsdef stop_threads(closable_queue, threads):# close次数根据threads次数来,保障每个每个线程都能正确关闭for _ in threads:closable_queue.close()# 阻塞调用线程,直到队列中的所有任务都被处理完成# 每次向队列中添加一个任务(通过 .put()),队列内部的任务计数器会增加 1。# 每次调用 .task_done(),任务计数器会减少 1。# .join() 方法会一直阻塞,直到任务计数器降为 0,也就是队列中的所有任务都被标记为完成(通过调用 .task_done())closable_queue.join()for thread in threads:thread.join()if __name__ == '__main__':download_queue = ClosableQueue()resize_queue = ClosableQueue()upload_queue = ClosableQueue()done_queue = ClosableQueue()download_threads = start_threads(3, download, download_queue, resize_queue)resize_threads = start_threads(4, resize, resize_queue, upload_queue)upload_threads = start_threads(5, upload, upload_queue, done_queue)obj = object()for _ in range(1000):download_queue.put(obj)stop_threads(download_queue, download_threads)stop_threads(resize_queue, resize_threads)stop_threads(upload_queue, upload_threads)print(done_queue.qsize(), 'items fininished')
3.知识点
上述代码涉及到几个知识点,挺有意思的:
- 当我们想用queue来传递数据时,头疼的点在于:
①下游任务该怎么判断上游生产了数据呢?轮巡有点不优雅,可能会造成性能影响。
②上游任务啥时候告诉下游数据生产完毕了呢?可以通过插入一个特殊的数据告诉下游生产完毕了。
③队列该设置多大呢?如果下游数据消费不过来,上游一直生产数据插入到队列,容易oom。
④怎么判断中间队列的数据消费完毕了呢?即如何优雅地结束程序。
上述代码利用Queue类的特性比较优雅地解决了上述的几个问题:
① Queue非常优雅,设置size之后,如果size满了,put方法会阻塞,直到数据被消费了才可以往里面添加数据。当queue为空,get方法会阻塞,直到有数据进来。
② 如代码中SENTINEL对象,如果调用close方法,就往队列中插入一个哨兵对象,告诉下游,上游数据生产完毕了。
③ 代码中没有设置队列大小,但是Queue支持设置。
④ Queue中.join() 方法会一直阻塞,直到任务计数器降为 0,也就是队列中的所有任务都被标记为完成。
其中,任务计数器的原理为:
- 每次向队列中添加一个任务(通过 .put()),队列内部的任务计数器会增加 1。
- 每次调用 .task_done(),任务计数器会减少 1。
注意看,改写__iter__方法时,每次获取一个元素都调用了一次task_done()方法,即告诉任务计数器需要-1了。
4. 总结
这段代码还是比较精髓,其中关于队列Queue的用法,关于threads的用法和线程中共享数据的用法值得学习。