介绍:
线程(Threads)是操作系统提供的一种轻量级的执行单元,可以在一个进程内并发执行多个任务。每个线程都有自己的执行上下文,包括栈、寄存器和程序计数器。
在Python中,可以使用threading
模块创建和管理线程。线程可以同时执行多个任务,可以在一个线程中执行耗时操作,而不会阻塞其他线程的执行。线程之间共享进程的资源,如内存空间,因此需要注意线程安全的问题。
然而,Python的线程在特定情况下可能会受到全局解释器锁(Global Interpreter Lock,GIL)的限制。GIL是一种机制,它确保同一时刻只有一个线程可以执行Python字节码。这意味着在多线程场景下,即使有多个线程,也无法真正实现并行执行。因此,在CPU密集型的任务中,Python的线程并不能充分利用多核处理器的能力。
1、导入threading
模块
在使用Python线程之前,首先需要导入 threading
模块。可以使用以下语句导入该模块:
import threading
2、创建线程
使用 threading.Thread
类创建线程对象。可以通过传递一个可调用的目标函数和其他参数来实例化线程对象。目标函数是线程实际执行的任务。
# 定义一个目标函数作为线程的执行任务
def my_task(arg1, arg2):# 执行任务的代码# 创建线程对象
my_thread = threading.Thread(target=my_task, args=(arg1, arg2))
3、启动线程
通过调用线程对象的 start()
方法来启动线程。启动线程后,它将在后台运行,执行目标函数中的代码。
my_thread.start()
4、等待线程完成
可以使用 join()
方法等待线程执行完毕。调用 join()
方法会阻塞当前线程,直到目标线程执行完成。
my_thread.join()
5、线程同步
在多线程编程中,线程之间的同步是一项重要的任务,旨在确保线程按照预期的顺序执行,并避免竞态条件和数据不一致的问题。Python提供了几种同步原语,常用的包括锁(Lock)、信号量(Semaphore)、事件(Event)和条件变量(Condition)。下面详细介绍这些同步原语的特点和使用方法:
锁(Lock)
锁是一种最基本的同步原语,在Python中由 threading.Lock
类实现。它提供了两个主要方法:acquire()
和 release()
。一个线程可以通过调用 acquire()
来获取锁,如果锁当前没有被其他线程持有,则该线程将获得锁并继续执行,否则将被阻塞直到锁被释放。当线程完成对临界区的访问后,应该调用 release()
来释放锁,以便其他线程可以获取它。
import threading# 创建锁对象
lock = threading.Lock()# 线程函数
def thread_function():lock.acquire()# 临界区代码lock.release()
锁还支持上下文管理器的使用方式,可以使用 with
语句来自动获取和释放锁:
import threading# 创建锁对象
lock = threading.Lock()# 线程函数
def thread_function():with lock:# 临界区代码
信号量(Semaphore)
信号量是一种更高级的同步原语,用于控制对共享资源的并发访问。Python中的信号量由 threading.Semaphore
类实现。信号量维护一个内部计数器,线程可以通过调用 acquire()
来减少计数器的值,如果计数器为零,则线程将被阻塞。线程在完成对共享资源的访问后,应该调用 release()
来增加计数器的值,以便其他线程可以获取信号量。
import threading# 创建信号量对象
semaphore = threading.Semaphore(value=3) # 设置初始计数器值为3# 线程函数
def thread_function():semaphore.acquire()# 访问共享资源semaphore.release()
信号量的计数器可以控制同时访问共享资源的线程数量。
事件(Event)
事件是一种用于线程间通信的同步原语,由 threading.Event
类实现。事件有两种状态:已设置和未设置。线程可以通过调用 set()
来设置事件,将其状态设置为已设置;通过调用 clear()
可以将事件状态设置为未设置。线程可以通过调用 wait()
来等待事件的设置,如果事件已设置,则线程可以继续执行,否则将被阻塞。
import threading# 创建事件对象
event = threading.Event()# 线程函数
def thread_function():event.wait() # 等待事件设置# 执行操作# 主线程设置事件
event.set()
事件还可以使用 is_set()
方法来检查事件的状态。
条件变量(Condition)
条件变量是一种复杂的同步原语,由 threading.Condition
类实现。它提供了一个条件队列,允许线程等待某个条件的发生。条件变量结合锁一起使用,可以实现更复杂的线程间同步。
import threading# 创建条件变量对象
condition = threading.Condition()# 线程函数 A
def thread_function_a():with condition:while not condition_predicate():condition.wait()# 执行操作# 线程函数 B
def thread_function_b():with condition:# 修改条件condition.notify() # 通知等待的线程# 主线程
with condition:# 修改条件condition.notify() # 通知等待的线程
在线程函数 A 中,线程会等待条件谓词成立的情况下才继续执行,否则会调用 wait()
方法将线程挂起。线程函数 B 可以在某个条件发生变化时调用 notify()
方法来通知等待的线程。
6、共享数据
共享数据是指多个线程同时访问和修改的数据。当多个线程同时读写共享数据时,可能会发生竞态条件(Race Condition)和数据损坏的问题。为了确保线程安全性,需要采取适当的同步措施来保护共享数据。以下是一些常用的同步机制和技术:
锁(Lock)
锁是一种最常见的同步原语,用于保护共享数据的互斥访问。在多线程环境中,一个线程可以通过获取锁来独占地访问共享数据,其他线程必须等待锁释放后才能访问。锁可以使用 threading.Lock
类来实现,通过调用 acquire()
和 release()
方法来获取和释放锁。
import threading# 创建锁对象
lock = threading.Lock()# 共享数据
shared_data = 0# 线程函数
def thread_function():global shared_datalock.acquire()# 访问和修改共享数据shared_data += 1lock.release()
在访问共享数据之前获取锁,确保同一时间只有一个线程可以修改数据,从而避免竞态条件。
信号量(Semaphore)
信号量也可以用于保护共享数据的访问,在多线程环境中控制并发访问的数量。信号量维护一个内部计数器,线程在访问共享数据之前通过获取信号量来减少计数器的值,如果计数器为零,则线程将被阻塞。线程在完成对共享数据的访问后,通过释放信号量来增加计数器的值,从而允许其他线程继续访问。
import threading# 创建信号量对象
semaphore = threading.Semaphore()# 共享数据
shared_data = 0# 线程函数
def thread_function():global shared_datasemaphore.acquire()# 访问和修改共享数据shared_data += 1semaphore.release()
通过适当设置信号量的初始值,可以控制同时访问共享数据的线程数量。
其他同步原语
Python还提供了其他一些同步原语,如条件变量(Condition)和事件(Event)。它们可以用于更复杂的同步需求,如线程之间的通信和等待特定条件的发生。
import threading# 创建条件变量对象
condition = threading.Condition()# 共享数据
shared_data = []# 线程函数 A
def thread_function_a():with condition:while not condition_predicate():condition.wait()# 访问和修改共享数据# 线程函数 B
def thread_function_b():with condition:# 修改条件condition.notify() # 通知等待的线程
在上述示例中,线程函数 A等待条件谓词成立的情况下才能访问共享数据,线程函数 B在条件发生变化时通过 notify()
方法通知等待的线程。
7、线程状态
线程状态是指线程在不同的时间点上所处的状态,它反映了线程的执行情况和可用性。在多线程编程中,线程可以处于以下几种不同的状态:
新建(New)状态
当创建线程对象但尚未启动线程时,线程处于新建状态。此时线程对象已经被创建,但尚未分配系统资源和执行代码。可以通过实例化线程类或者从线程池中获取线程来创建新线程。
import threading# 创建新线程对象
thread = threading.Thread(target=thread_function)
就绪(Runnable)状态
当线程准备好执行,但由于系统调度的原因还未开始执行时,线程处于就绪状态。线程已经分配了系统资源,并且等待调度器将其放入运行队列中。多个就绪状态的线程可能会竞争CPU资源,调度器会根据调度算法决定哪个线程被选中执行。
运行(Running)状态
当线程获得CPU资源并开始执行线程函数时,线程处于运行状态。此时线程的代码正在被执行,它可能会与其他线程并发执行或通过时间片轮转进行切换。只有一个线程可以处于运行状态。
阻塞(Blocked)状态
当线程被暂停执行,等待某个条件的发生时,线程处于阻塞状态。在阻塞状态下,线程不会占用CPU资源,直到满足特定条件后才能继续执行。常见的阻塞原因包括等待I/O操作、获取锁失败、等待其他线程的通知等。
终止(Terminated)状态
当线程完成了它的执行任务或被显式终止时,线程处于终止状态。线程函数执行完毕或者出现异常时,线程将自动终止。也可以通过调用线程对象的 join()
方法来等待线程执行完毕。
# 等待线程执行完毕
thread.join()
线程状态之间可以相互转换,线程的状态转换通常由操作系统的调度器和线程的执行情况决定。例如,当线程处于就绪状态并获得CPU资源时,它将进入运行状态;当线程在执行期间发生阻塞,它将进入阻塞状态;当线程执行完毕或被终止时,它将进入终止状态。
8、线程属性和方法
threading.Thread
类提供了一些属性和方法来管理和操作线程。其中一些常用的属性和方法包括:
name
:获取或设置线程的名称。ident
:获取线程的标识符。is_alive()
:检查线程是否处于活动状态。setDaemon(daemonic)
:将线程设置为守护线程,当主线程退出时,守护线程也会被终止。start()
:启动线程。join(timeout)
:等待线程执行完成,可选地设置超时时间。run()
:线程的执行入口点,在线程启动时被调用。sleep(secs)
:线程休眠指定的秒数。
9、线程间通信
线程间通信是指在多线程编程中,多个线程之间进行数据传递和共享的过程。线程间通信的目的是实现线程之间的协作和数据交换,以完成复杂的任务。在Python中,可以使用 queue
模块提供的队列来实现线程安全的数据传递。
queue
模块提供了几种队列类型,常用的有以下三种:
Queue(先进先出队列)
Queue
是最常用的线程安全队列,它使用先进先出(FIFO)的方式存储和获取数据。多个线程可以安全地将数据放入队列中,并从队列中获取数据。Queue
类提供了以下常用方法:
put(item[, block[, timeout]])
:将数据放入队列,可指定是否阻塞和超时时间。get([block[, timeout]])
:从队列中获取数据,可指定是否阻塞和超时时间。empty()
:判断队列是否为空。full()
:判断队列是否已满。qsize()
:返回队列中的元素数量。
import queue# 创建队列对象
q = queue.Queue()# 线程函数 A
def thread_function_a():while True:item = q.get()# 处理数据# 线程函数 B
def thread_function_b():while True:# 产生数据q.put(item)
在上述示例中,线程函数 A从队列中获取数据并进行处理,线程函数 B产生数据并放入队列中,两个线程通过队列进行数据交换。
LifoQueue(后进先出队列)
LifoQueue
是一种后进先出(LIFO)的队列类型,与 Queue
不同的是,它的获取顺序与放入顺序相反。其他方法与 Queue
类相同。
import queue# 创建后进先出队列对象
q = queue.LifoQueue()
后进先出队列适用于某些特定的场景,例如需要按照相反的顺序处理数据。
PriorityQueue(优先级队列)
PriorityQueue
是一种根据优先级排序的队列类型,可以为队列中的每个元素指定一个优先级。优先级高的元素先被获取。元素的优先级可以是数字、元组或自定义对象。其他方法与 Queue
类相同。
import queue# 创建优先级队列对象
q = queue.PriorityQueue()
优先级队列适用于需要根据优先级顺序处理数据的场景。
10、线程池
线程池是一种用于管理和复用线程的机制,可以有效地管理大量线程的生命周期,并提供简化的接口来提交和管理任务。在Python中,可以使用 concurrent.futures
模块中的 ThreadPoolExecutor
类来创建线程池。
线程池的特点
- 线程复用:线程池中的线程可以被重复使用,避免了线程频繁创建和销毁的开销。
- 线程管理:线程池负责管理线程的生命周期,包括线程的创建、销毁和回收。
- 并发控制:线程池可以限制并发执行的线程数量,防止系统资源被过度占用。
- 异步提交:线程池提供了异步提交任务的方法,可以在后台执行任务并返回结果。
创建线程池
可以使用 ThreadPoolExecutor
类来创建线程池。可以指定线程池的大小(可同时执行的线程数量)和其他相关参数。
from concurrent.futures import ThreadPoolExecutor# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)
在上述示例中,创建了一个最大同时执行 5 个线程的线程池。
提交任务
可以使用线程池的 submit()
方法提交任务,该方法会返回一个 Future
对象,用于获取任务的执行结果。
# 定义任务函数
def task_function():# 任务逻辑# 提交任务到线程池
future = pool.submit(task_function)
在上述示例中,将任务函数 task_function
提交到线程池,并获得了一个 Future
对象。
获取任务结果
可以使用 Future
对象的 result()
方法来获取任务的执行结果。如果任务尚未完成,result()
方法将会阻塞直到任务完成并返回结果。
# 获取任务结果
result = future.result()
关闭线程池
在使用完线程池后,应该调用 shutdown()
方法来关闭线程池。关闭线程池后,将不再接受新的任务提交,并且会等待所有已提交的任务执行完毕后再退出。
# 关闭线程池
pool.shutdown()