目录
目标
Python版本
官方文档
概述
线程池
实战
创建线程池的基本语法
批量提交任务
生产者&消费者模型
目标
掌握线程池的基本概念和使用方法。
Python版本
Python 3.9.18
官方文档
concurrent.futures — Launching parallel taskshttps://docs.python.org/3.9/library/concurrent.futures.html
概述
线程池
线程的创建和销毁是需要消耗资源的,比如:CPU资源、内存、上下文切换等。线程池使得线程被创建后不停地被复用,大概过程是:
- 程序启动后线程池会预先创建一些线程,并将这些线程放入线程池中,这些线程是闲置着的,随时准备执行任务。
- 当需要任务执行时,程序会向线程池提交任务,线程池会取出一个空闲的线程来执行任务。
- 任务执行完成后线程不会被销毁,而是再次放回到线程池中,等待下一个任务。
我们创建线程池可以指定最大线程数量,也可以不指定。官方文档对于默认线程数量做了描述:
if max_workers is None:
# ThreadPoolExecutor is often used to:
# * CPU bound task which releases GIL
# * I/O bound task (which releases GIL, of course)
#
# We use cpu_count + 4 for both types of tasks.
# But we limit it to 32 to avoid consuming surprisingly large resource
# on many core machine. max_workers = min(32, (os.cpu_count() or 1) + 4)
即默认设置的最大线程数量是:
- 操作系统核心数量+4,
- 如果获取不到操作系统核心数量则最大线程数量是1个。
- 默认不超过32个线程数量。
实战
创建线程池的基本语法
import random
import time
from concurrent.futures import ThreadPoolExecutordef fun(url):time.sleep(random.randint(1,3))# 获取当前时间戳current_time = time.localtime()# 格式化时间为yyyy-MM-dd HH:mm:ssformatted_time = time.strftime('%Y-%m-%d %H:%M:%S', current_time)return f"请求到了{url}的网络数据:{formatted_time}"if __name__ == '__main__':executor = ThreadPoolExecutor(max_workers=4)task_1=executor.submit(fun, "www.baidu.com")task_2=executor.submit(fun, "www.bilibili.com")task_3=executor.submit(fun, "www.jd.com")task_4=executor.submit(fun, "www.taobao.com")task_5 = executor.submit(fun, "www.tianmao.com")task_1.done()task_2.done()task_3.done()task_4.done()task_5.done()result_1=task_1.result()result_2=task_2.result()result_3=task_3.result()result_4=task_4.result()result_5=task_5.result()print(result_1)print(result_2)print(result_3)print(result_4)print(result_5)print("主线程结束")
批量提交任务
需求
创建一个线程池,用来批量执行不同页面的请求任务。
import random
import time
from concurrent.futures import ThreadPoolExecutor, as_completeddef get_page_html(page_num):time.sleep(random.randint(1,5))# 获取当前时间戳current_time = time.localtime()# 格式化时间为yyyy-MM-dd HH:mm:ssformatted_time = time.strftime('%Y-%m-%d %H:%M:%S', current_time)return f"请求到第{page_num}页数据:{formatted_time}"if __name__ == '__main__':executor=ThreadPoolExecutor()#请求第一页至第一百页的数据。page_num_list=[page_num for page_num in range(1,101)]task_list = [executor.submit(get_page_html,page_num) for page_num in page_num_list]#按照task_list元素的顺序返回结果,即使后面的页面数据提前请求到了数据,也会靠后返回。for task in task_list:print("按照task_list中的元素顺序返回数据:",task.result())#按照先执行完,先返回结果。for future in as_completed(task_list):print("按照先请求到数据就先返回数据:",future.result())print("主线程结束")
生产者&消费者模型
需求
每次生产出一只烤鸭,就会被等待的消费者消费,最多生产100000只烤鸭。要求用线程池实现。
import queue
import threading
import time
from concurrent.futures import ThreadPoolExecutor# 最多生产烤鸭数量
max_duck_count = 100000
# 当前生产的烤鸭数量
duck_count = 0
# 把生产的烤鸭放入队列中
duck_queue = queue.Queue()def produce_duck(duck_queue):global duck_count# 生产者不停地生产烤鸭while True:if duck_count >= max_duck_count:breakduck_count += 1duck_queue.put(duck_count)print(f"===========生产者{threading.current_thread().name}生产了{duck_count}只烤鸭。")# 所有生产者完成后放入退出信号duck_queue.put(None)def consume_duck(duck_queue):# 消费者不停地消费烤鸭while True:duck = duck_queue.get()if duck is None:# 收到退出信号后,消费者退出duck_queue.put(None) # 传递退出信号给其他消费者breakprint(f"——————————消费者{threading.current_thread().name}消费了{duck}只烤鸭。")# 初始化线程,设置名称
def init_thread():thread_id = threading.get_ident()threading.current_thread().name = f"线程_{thread_id}"if __name__ == '__main__':thread_pool = ThreadPoolExecutor(max_workers=10,initializer=init_thread)# 启动多个生产者线程for i in range(5):thread_pool.submit(produce_duck, duck_queue)# 启动多个消费者线程for i in range(8):thread_pool.submit(consume_duck, duck_queue)# 等待线程池中的任务执行完毕thread_pool.shutdown(wait=True)print("主线程结束。")