首先需要明确的是,多进程和其他语言的一样,能够利用多核cpu,但是python由于GIL的存在,多线程在执行的时候,实际上,每一时刻只有一个线程在执行。相当于是单线程。然而多线程在某些情况下,还是能够起到加速的效果。
需要了解的是,程序的耗时一般消耗在IO和CPU上,按照占比不同,一般分为IO密集型或者CPU密集型。比如文件读写、网络传输,磁盘IO等,属于IO密集型,而矩阵计算、数值计算这种就属于CPU密集型。在单线程中,遇见IO操作的时候,CPU会阻塞,直到IO操作完成,花费的时间成本为IO耗时加CPU耗时。但是在多线程中,遇见IO操作的时候,该线程会交出GIL,其他线程可以继续运行,这样可以让CPU和IO并行。因此,如果是IO密集型,即在代码中,主要是进行IO读取,那么多线程仍然能够起到加速左右,值得注意的是,这里的加速效果应该是来自于处理IO的设备,支持并行IO,即同一时刻,能够处理多个IO请求。反之,如果是CPU密集型,IO耗时忽略不计的话,此时多线程相当于是单线程,同时考虑到线程的上下文切换,那么多线程的运行时间反而会更多。
线程池的使用方法submit和map
python中concurrent.futures这个类提供了线程池和进程池的接口。as_completed按照任务的完成时间返回,map按照任务的添加时间返回
我们可以通过submit或map添加任务,但使用起来存在细微差别。
一般通过submit得到一个包含future对象的列表,然后通过concurrent.futures.as_completed去遍历这个列表,该方法会阻塞,可以设置超时时间。每当有任务完成的时候,就能通过future.result()得到任务执行的结果,该方法同样会阻塞,可以设置超时时间。因此通过这种方法,输出是按照任务执行完成的时间排序的。
当然,我们也可以不用as_completed去遍历,这样就按照任务的顺序返回。因为每个任务如果没完成就阻塞,完成了就添加。
import concurrent.futures
import timedef task(times):# 模拟任务执行time.sleep(times)return timesdef main():num_threads = 3with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:# 提交任务到线程池# submitfutures = [executor.submit(task, t) for t in [7, 1, 3, 8]]# # 收集每个任务的结果results = []# for future in concurrent.futures.as_completed(futures):# result = future.result()# results.append(result)for future in futures:result = future.result()results.append(result)# map# results = []# futures = executor.map(task, [7, 1, 3, 8])# for future in futures:# results.append(future)print(f"results = {results}")if __name__ == "__main__":main()
上面两种submit,依次输出1 3 7 8和7 1 3 8
map函数则不太一样,第一个参数是需要线程执行的函数,第二个参数是一个迭代器,会依此将参数应用到线程函数中。返回结果和列表的顺序一样。返回7 1 3 8
多线程和多进程的对比
线程安全
import concurrent.futures
import random
from threading import Lock
import time# 共享变量
shared_variable = 0
# 锁对象,用于保护共享变量的访问
lock = Lock()def task(task_id):global shared_variable# 模拟任务执行# 获取锁,确保对共享变量的访问是线程安全的for _ in range(1000000):with lock:shared_variable += 1# shared_variable += 1def main():num_threads = 2with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:# 提交任务到线程池futures = [executor.submit(task, i) for i in range(2)]# 收集每个任务的结果results = []for future in concurrent.futures.as_completed(futures):result = future.result()results.append(result)print(f"results = {results}")print(f"shared_variable = {shared_variable}")if __name__ == "__main__":main()
注释掉上面代码的lock,试试加锁和不加锁,可以很清晰的看到,加锁的时候不会有竞争冒险,而不加锁则可能有竞争冒险。因为几率是比较小的,观察不到的话,可以加大循环的次数。