Python中线程的实现
1. 线程
在Python中,threading
库提供了线程的接口。我们通过threading
中提供的接口创建、启动、同步线程。
例1. 使用线程旋转指针
想象一个场景:程序执行了一个耗时较长的操作,如复制一个大文件,我们希望这个过程中程序显示一个动画,表示程序正常运行没有卡死。
简化一下:启动一个函数,执行 3 秒。在这3秒内,在终端持续显示指针旋转的动画。下面用线程来实现这个操作。
注:本例代码主要来自《流畅的Python》(第二版) 19.4.1
首先我们定义旋转函数spin
和阻塞函数slow
。
spin
函数每隔0.1s依次打印\|/-
,看起来就像是指针转动:
import itertools
import time
def spin(msg: str) -> None: for char in itertools.cycle(r'\|/-'): status = f'\r{char} {msg}' print(status, end='', flush=True)time.sleep(0.1)blanks = ' ' * len(status)print(f'\r{blanks}\r', end='')if __name__ == '__main__':spin("thinking...")
slow
函数用来模拟一个耗时的操作。这里我们直接调用time.sleep(3)
等待3秒,然后返回一个结果。
# 阻塞3秒,并返回42
def slow() -> int:time.sleep(3) return 42
调用
time.sleep()
阻塞所在的线程,但是释放 GIL,其他 Python 线程可以继续运行。
现在,我们要用线程实现并发。看起来就像是slow
和spin
同时进行。
下面对spin
函数做了一些修改,通过threading.Event
信号量来同步线程。
import itertools
import time
from threading import Thread, Event# 旋转
def spin(msg: str, done: Event) -> None: # done用于同步线程for char in itertools.cycle(r'\|/-'): status = f'\r{char} {msg}' print(status, end='', flush=True)if done.wait(.1): #等待/阻塞 。除非有其他线程set了这个事件,则返回True;或者经过指定的时间(0.1s)后,返回 False。breakblanks = ' ' * len(status)print(f'\r{blanks}\r', end='')# 阻塞3秒,并返回42
def slow() -> int:time.sleep(3) return 42
使用线程来并发执行两个函数。
下面我们只手动启动了一个spinner线程,因为程序本身就有一个主线程。
def supervisor() -> int: done = Event() # 信号量,用于线程同步spinner = Thread(target=spin, args=('thinking!', done)) # 使用Thread创建线程实例spinner。print(f'spinner object: {spinner}') spinner.start() # 启动spinner线程result = slow() # 调用slow,阻塞 main 线程。同时,次线程spinner运行旋转指针动画done.set() # 设置done为真,唤醒等待done的线程。结束spinner中的循环。spinner.join() # 等待spinner 线程结束。-貌似这里加不加都不影响。return resultdef main() -> None:result = supervisor() print(f'Answer: {result}')if __name__ == '__main__':main()
程序的执行顺序,主要步骤都发生在supervisor
函数中,我们跳过main从supervisor开始看。
由于GIL的存在,同一时刻只有一个线程在执行。所以下面是一个顺序执行的过程。
执行过程大致如下:
主线程:创建spinner
线程,启动spinner
线程
spinner线程:输出字符,然后遇到done.wait(.1)
阻塞自己。
主线程:调用slow
函数,遇到time.sleep(3)
阻塞
spinner线程:done.wait(.1)
超过了0.1秒返回False,继续输出字符。重复进行阻塞0.1秒、输出字符。
3秒后…
主线程:slow执行完毕,返回结果42。主线程继续执行done.set()
,这会唤醒等待done的线程spinner。
spinner线程:运行到done.wait(.1)
,由于主线程执行了done.set()
使得这里的结果为True,所以执行break,结束循环。执行循环下面的print语句后spinner线程结束。
主线程:返回结果。
例2.计算因子
第二个例子我们看一个(失败的)并行计算的例子:
我们希望用n个线程并行计算n个数各自的因子。
注:本例代码来自《Effective Python》(第二版) 第53章
基准方法:
逐个计算。
import time# 计算number的因子
def factorize(number):for i in range(1, number + 1):if number % i == 0:yield inumbers = [2139079, 1214759, 1516637, 1852285, 14256346, 12456533]
start = time.time()for number in numbers:list(factorize(number))end = time.time()
delta = end - start
print(f'串行方法花费了 {delta:.3f} 秒')
多线程方式:
可以像例1中使用Thread函数实现线程:
def get_factor(number):factors = list(factorize(number))return factorsstart = time.time()
threads = []
for number in numbers:thread = Thread(target=get_factor, args=(number,))thread.start() # 启动threads.append(thread)# 等待所有线程完成
for thread in threads:thread.join() # 等待完成end = time.time()
delta = end - start
print(f'Thread方法花费了 {delta:.3f} 秒')
实现线程的另一种方式是继承Thread
类并实现run
方法:
from threading import Thread# 继承Thread,需要实现run方法,在run方法中执行要做的事情
class FactorizeThread(Thread):def __init__(self, number):super().__init__()self.number = numberdef run(self):self.factors = list(factorize(self.number))start = time.time()threads = []
for number in numbers:thread = FactorizeThread(number)thread.start() # 启动threads.append(thread)# 等待所有线程完成
for thread in threads:thread.join() # 等待完成end = time.time()
delta = end - start
print(f'Thread方法花费了 {delta:.3f} 秒')
运行结果:
你会发现这个多线程的版本并没有变快,这并不意外。
介绍线程时说过,因为GIL的存在,多线程无法同时执行,甚至因为创建和切换线程产生额外的开销导致耗时增加。
小结:
在GIL的限制下,Python线程对于并行计算没有用处,但是对于等待(IO、网络、后台任务)是有用处的。下一节我们会看一些Python线程的实际案例。