https://www.cnblogs.com/tkqasn/p/5700281.html
threading — 基于线程的并行
threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。python当前版本的多线程库没有实现优先级、线程组,线程也不能被停止、暂停、恢复、中断。
threading模块提供的类:
- Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。
threading 模块提供的常用方法:
- threading.current_thread():获取当前的线程对象。如果调用方的控制线程不是通过线程模块创建的,则返回功能有限的虚拟线程对象。
- threading.currentThread(): 返回当前的线程变量。
- threading.get_ident():获取线程标识符。标识符还可以通过threading.currentThread().ident获取
- threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
- threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
- threading.main_thread():获取主线程对象
threading 模块提供的常量:
- threading.TIMEOUT_MAX 设置threading全局超时时间。
Thread类
Thread是线程类,有两种使用方法,直接传入要运行的方法或从Thread继承并覆盖run():
# coding:utf-8
import threading
import time- 方法一:将要执行的方法作为参数传给Thread的构造方法
def action(arg):time.sleep(1)print 'the arg is:%s\r' %argfor i in xrange(4):t =threading.Thread(target=action,args=(i,))t.start()print 'main thread end!'- 方法二:从Thread继承,并重写run()
class MyThread(threading.Thread):def __init__(self,arg):super(MyThread, self).__init__()#注意:一定要显式的调用父类的初始化函数。self.arg=argdef run(self):#定义每个线程要运行的函数time.sleep(1)print 'the arg is:%s\r' % self.argfor i in xrange(4):t =MyThread(i)t.start()print 'main thread end!'
构造方法:
Thread(group=None, target=None, name=None, args=(), kwargs={})
- group: 线程组,目前还没有实现,库引用中提示必须是None;
- target: 要执行的方法;
- name: 线程名;
- args/kwargs: 要传入方法的参数。
实例方法:
- isAlive(): 返回线程是否在运行。正在运行指启动后、终止前。
- get/setName(name): 获取/设置线程名。
- run():执行start()方法会调用run(),该方将创建Thread对象时传递给target的函数名,和传递给args、kwargs的参数组合成一个完整的函数,并执行该函数。run()方法一般在自定义Thead类时会用到。
- is/setDaemon(bool): 获取/设置是后台线程(默认前台线程(False))。(在start之前设置)
如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,主线程和后台线程均停止
如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止 - start(): 启动线程。
- join([timeout]): 阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数)。
- ident:“线程标识符”,如果线程尚未启动,则为None。如果线程启动是一个非零整数。
- daemon:如果thread.daemon=True表示该线程为守护线程,必须在调用Start()之前设置此项,否则将引发RuntimeError。默认为False
- isDaemon():判断一个线程是否是守护线程。
使用例子一(未设置setDeamon):
# coding:utf-8
import threading
import timedef action(arg):time.sleep(1)print 'sub thread start!the thread name is:%s' % threading.currentThread().getName()print 'the arg is:%s' %argtime.sleep(1)for i in xrange(4):t =threading.Thread(target=action,args=(i,))t.start()print 'main_thread end!'
main_thread end
sub thread start. the thread name is:Thread-1sub thread start. the thread name is:Thread-3the arg is:0the arg is:2sub thread start. the thread name is:Thread-2the arg is:1
sub thread start. the thread name is:Thread-4
the arg is:3Process finished with exit code 0
可以看出,创建的4个“前台”线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
验证了serDeamon(False)(默认)前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,主线程停止。
使用例子二(setDeamon=True)
# coding=utf-8
import threading
import timedef action(arg):time.sleep(1)print("sub thread start. the thread name is:%s"%threading.currentThread().getName())print("the arg is:%s"%arg)time.sleep(1)for i in range(4):t = threading.Thread(target=action,args=(i,))t.setDaemon(True)t.start()print('main_thread end')
main_thread end!Process finished with exit code 0
可以看出,主线程执行完毕后,后台线程不管是成功与否,主线程均停止
验证了setDeamon(True)后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,主线程均停止。
使用例子三(设置join)
# coding=utf-8
import threading
import timedef action(arg):time.sleep(1)print("sub thread start. the thread name is:%s、r"%threading.currentThread().getName())print("the arg is:%s\r"%arg)time.sleep(1)thread_list = []for i in xrange(4):t = threading.Thread(target=action,args=(i,))t.setDaemon(True)thread_list.append(t)for t in thread_list:t.start()for t in thread_list:t.join()print('main_thread end')
sub thread start. the thread name is:Thread-2
the arg is:1
sub thread start. the thread name is:Thread-1
the arg is:0
sub thread start. the thread name is:Thread-3
the arg is:2
sub thread start. the thread name is:Thread-4
the arg is:3
main_thread endProcess finished with exit code 0
设置join之后,主线程等待子线程全部执行完成后或者子线程超时后,主线程才结束
复制代码
验证了 join()阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout,即使设置了setDeamon(True)主线程依然要等待子线程结束。
使用例子四(join不妥当的用法,使多线程编程顺序执行)
# coding=utf-8
import threading
import timedef action(arg):time.sleep(1)print("sub thread start. the thread name is:%s\r"%threading.currentThread().getName())print("the arg is:%s\r"%arg)time.sleep(1)for i in xrange(4):t = threading.Thread(target=action,args=(i,))t.setDaemon(True)t.start()t.join()print('main_thread end')
sub thread start!the thread name is:Thread-1
the arg is:0
sub thread start!the thread name is:Thread-2
the arg is:1
sub thread start!the thread name is:Thread-3
the arg is:2
sub thread start!the thread name is:Thread-4
the arg is:3
main_thread end!Process finished with exit code 0
可以看出此时,程序只能顺序执行,每个线程都被上一个线程的join阻塞,使得“多线程”失去了多线程意义。
复制代码
Lock、Rlock类
由于线程之间随机调度:某线程可能在执行n条后,CPU接着执行其他线程。为了多个线程同时操作一个内存中的资源时不产生混乱,我们使用锁。
Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。
可以认为Lock有一个锁定池,当线程请求锁定时,将线程至于池中,直到获得锁定后出池。池中的线程处于状态图中的同步阻塞状态。
RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。
可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。
简言之:Lock属于全局,Rlock属于线程。
构造方法:
Lock(),Rlock(),推荐使用Rlock()
实例方法:
- acquire([timeout]): 尝试获得锁定。使线程进入同步阻塞状态。
- release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。
例子一(未使用锁):
# coding=utf-8
import threading
import timeg_num = 0def show(arg):global g_numtime.sleep(1)g_num += 1print g_numfor i in range(10):t = threading.Thread(target=show, args=(i,))t.start()print "main thread stop"
main thread stop
1
235
64
6789
多次运行可能产生混乱。这种场景就是适合使用锁的场景。
复制代码
例子二(使用锁):
# coding:utf-8
import threading
import timegl_num = 0
lock = threading.RLock()# 调用acquire([timeout])时,线程将一直阻塞,
# 直到获得锁定或者直到timeout秒后(timeout参数可选)。
# 返回是否获得锁。
def Func():lock.acquire()global gl_numgl_num += 1time.sleep(1)print gl_numlock.release()for i in range(10):t = threading.Thread(target=Func)t.start()
1
2
3
4
5
6
7
8
9
10Process finished with exit code 0
可以看出,全局变量在在每次被调用时都要获得锁,才能操作,因此保证了共享数据的安全性
复制代码
Lock对比Rlock
#coding:utf-8import threading
lock = threading.Lock() #Lock对象
lock.acquire()
lock.acquire() #产生了死锁。
lock.release()
lock.release()
print lock.acquire()import threading
rLock = threading.RLock() #RLock对象
rLock.acquire()
rLock.acquire() #在同一线程内,程序不会堵塞。
rLock.release()
rLock.release()
Condition类
Condition(条件变量)通常与一个锁关联。需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。
可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。
构造方法:
Condition([lock/rlock])
实例方法:
- acquire([timeout])/release(): 调用关联的锁的相应方法。
- wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。 当wait接收notify醒来时,已经通过acquire获得锁定。小心重复acquire操作.
- notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。 notify(n)表示通知n个等待的线程
- notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
例子一:生产者消费者模型
import threading
import time# 商品
product = None
# 条件变量
con = threading.Condition()# 生产者方法
def produce():global productif con.acquire():while True:if product is None:print 'produce...'product = 'anything'# 通知消费者,商品已经生产con.notify()# 等待通知con.wait()time.sleep(2)# 消费者方法
def consume():global productif con.acquire():while True:if product is not None:print 'consume...'product = None# 通知生产者,商品已经没了con.notify()# 等待通知con.wait()time.sleep(2)t1 = threading.Thread(target=produce)
t2 = threading.Thread(target=consume)
t2.start()
t1.start()
produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume...Process finished with exit code -1
程序不断循环运行下去。重复生产消费过程。
复制代码
例子二:生产者消费者模型
# coding=utf-8
import threading
import time
import randomMAX_NUM = 10class Producer(threading.Thread):'''生产者线程类'''def __init__(self, storage, consumeCon, produceCon):super(Producer, self).__init__()self.storage = storageself.consumeCon = consumeConself.produceCon = produceConself.acq_by_wait = Falsedef run(self):while True:self.do_round()time.sleep(1)def do_round(self):'''acq_by_wait标志是否是wait之后被notify唤醒的若是,notify唤醒自动acquire若不是,需要再次主动acquire:return:'''if not self.acq_by_wait:self.produceCon.acquire()if len(self.storage) < MAX_NUM:d = random.randint(1,99)self.storage.append(d)print('_'.join(('produce:', str(threading.currentThread()), str(d), str(len(self.storage)))))self.consumeCon.notify()self.produceCon.release()self.acq_by_wait = Falseelse:print('storage full, waiting...'+str(threading.currentThread()))self.produceCon.wait()self.acq_by_wait = Trueclass Consumer(threading.Thread):'''消费者线程类'''def __init__(self, storage, consumeCon, produceCon):super(Consumer,self).__init__()self.storage = storageself.consumeCon = consumeConself.produceCon = produceConself.acq_by_wait = Falsedef run(self):while True:'''acq_by_wait标志是否wait周被notify唤醒若是,notify唤醒后自动acquire否则,需要再次互动acquire:return: '''if not self.acq_by_wait:self.consumeCon.acquire()if len(self.storage) > 0:print('_'.join((' Consumer:', str(threading.currentThread()), str(self.storage.pop()),str(len(self.storage)))))self.produceCon.notify()self.consumeCon.release()self.acq_by_wait = Falseelse:print(' Consumer waiting...storage empty'+str(threading.currentThread()))self.consumeCon.wait()self.acq_by_wait = Truedef Test():CONSUMER_NUM = 2PRODUCER_NUM = 10storageList = []storageLock = threading.Lock()consumeCon = threading.Condition(lock=storageLock)produceCon = threading.Condition(lock=storageLock)tAllList = []tConsumerList = []for _ in xrange(CONSUMER_NUM):tConsumer = Consumer(storageList, consumeCon, produceCon)tConsumerList.append(tConsumer)tAllList.append(tConsumer)tProducerList = []for _ in xrange(PRODUCER_NUM):tProducer = Producer(storageList, consumeCon, produceCon)tProducerList.append(tProducer)tAllList.append(tProducer)for t in tAllList:t.start()if __name__ == '__main__':Test()
Consumer waiting...storage empty<Consumer(Thread-1, started 12572)>Consumer waiting...storage empty<Consumer(Thread-2, started 11944)>
produce:_<Producer(Thread-3, started 12584)>_91_1Consumer:_<Consumer(Thread-1, started 12572)>_91_0
produce:_<Producer(Thread-4, started 11940)>_44_1Consumer:_<Consumer(Thread-1, started 12572)>_44_0
produce:_<Producer(Thread-5, started 11228)>_71_1Consumer:_<Consumer(Thread-2, started 11944)>_71_0Consumer waiting...storage empty<Consumer(Thread-1, started 12572)>
例子三:随机获取
import threadingalist = None
condition = threading.Condition()def doSet():if condition.acquire():while alist is None:condition.wait()for i in range(len(alist))[::-1]:alist[i] = 1condition.release()def doPrint():if condition.acquire():while alist is None:condition.wait()for i in alist:print i,printcondition.release()def doCreate():global alistif condition.acquire():if alist is None:alist = [0 for i in range(10)]condition.notifyAll()condition.release()tset = threading.Thread(target=doSet,name='tset')
tprint = threading.Thread(target=doPrint,name='tprint')
tcreate = threading.Thread(target=doCreate,name='tcreate')
tset.start()
tprint.start()
tcreate.start()
结果有两种可能
0 0 0 0 0 0 0 0 0 0
或
1 1 1 1 1 1 1 1 1 1
[Bounded]Semaphore
一个信号量管理一个内部计数器,该计数器因 acquire() 方法的调用而递减,因 release() 方法的调用而递增。 计数器的值永远不会小于零;当 acquire() 方法发现计数器为零时,将会阻塞,直到其它线程调用 release() 方法。
信号量对象也支持 上下文管理协议
构造方法:
threading.Semaphore(value=1)
该类实现信号量对象。信号量对象管理一个原子性的计数器.
可选参数 value 赋予内部计数器初始值,默认值为 1 。如果 value 被赋予小于0的值,将会引发 ValueError 异常。
class threading.BoundedSemaphore(value=1)
该类实现有界信号量。有界信号量通过检查以确保它当前的值不会超过初始值。如果超过了初始值,将会引发 ValueError 异常。在大多情况下,信号量用于保护数量有限的资源。如果信号量被释放的次数过多,则表明出现了错误。没有指定时, value 的值默认为1。
实例方法
acquire(blocking=True, timeout=None)
获取一个信号量。
- 如果内部计数器大于0,将其减1并立即返回True。
- 如果内部计数器为0,则将其阻塞知道被release()调用唤醒。每次release()只唤醒一个线程。
release()
释放一个信号量,将内部计数器的值增加1。当计数器原先的值为0且有其它线程正在等待它再次大于0时,唤醒正在等待的线程。
示例
信号量主要用在保护有限的资源。以数据库连接数为例说明,假设当前数据库支持最大连接数为3,将信号量初始值设为3,那么同时最大可以有三个线程连接数据库,其他线程若再想连接数据库,则只有等待,直到某一个线程释放数据库连接。
import threading
import timesm=threading.Semaphore(3)def connectDb():sm.acquire()print threading.currentThread().getName()+' connecting to db...\n'time.sleep(2)print threading.currentThread().getName()+' released db...\n'sm.release()if __name__ == '__main__':s1=threading.Thread(target=connectDb,args=())s2 = threading.Thread(target=connectDb, args=())s3 = threading.Thread(target=connectDb, args=())s4 = threading.Thread(target=connectDb, args=())s1.start()s2.start()s3.start()s4.start()
结果:
C:\Python27\python.exe E:/pythonproj/基础练习/t8.py
Thread-1 connecting to db...Thread-2 connecting to db...Thread-3 connecting to db...Thread-1 released db...Thread-2 released db...
Thread-3 released db...Thread-4 connecting to db...Thread-4 released db...Process finished with exit code 0
Event类
Event(事件)是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。Event内置了一个初始为False的标志,当调用set()时设为True,调用clear()时重置为 False。wait()将阻塞线程至等待阻塞状态。
Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态。
构造方法:
Event()
实例方法:
- isSet(): 当内置标志为True时返回True。
- set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。
- clear(): 将标志设为False。
- wait([timeout]): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。
例子一
# encoding: UTF-8
import threading
import timeevent = threading.Event()def func():# 等待事件,进入等待阻塞状态print '%s wait for event...' % threading.currentThread().getName()event.wait()# 收到事件后进入运行状态print '%s recv event.' % threading.currentThread().getName()t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t1.start()
t2.start()time.sleep(2)# 发送事件通知
print 'MainThread set event.'
event.set()
Thread-1 wait for event...
Thread-2 wait for event...#2秒后。。。
MainThread set event.
Thread-1 recv event.Thread-2 recv event.
timer类
Timer(定时器)是Thread的派生类,用于在指定时间后调用一个方法。
构造方法:
Timer(interval, function, args=[], kwargs={})
- interval: 指定的时间
- function: 要执行的方法
- args/kwargs: 方法的参数
实例方法:
Timer从Thread派生,没有增加实例方法。
例子一:
# encoding: UTF-8
import threadingdef func():print 'hello timer!'timer = threading.Timer(5, func)
timer.start()
线程延迟5秒后执行。
Barrier类
栅栏类提供一个简单的同步原语,用于应对固定数量的线程需要彼此相互等待的情况。线程调用 wait() 方法后将阻塞,直到所有线程都调用了 wait() 方法。此时所有线程将被同时释放。
栅栏对象可以被多次使用,但进程的数量不能改变。
这是一个使用简便的方法实现客户端进程与服务端进程同步的例子:
b = Barrier(2, timeout=5)def server():start_server()b.wait()while True:connection = accept_connection()process_server_connection(connection)def client():b.wait()while True:connection = make_connection()process_client_connection(connection)
构造方法
class threading.Barrier(parties, action=None, timeout=None)
创建一个需要 parties 个线程的栅栏对象。如果提供了可调用的 action 参数,它会在所有线程被释放时在其中一个线程中自动调用。 timeout 是默认的超时时间,如果没有在 wait() 方法中指定超时时间的话。
实例属性
wait(timeout=None)
冲出栅栏。当栅栏中所有线程都已经调用了这个函数,它们将同时被释放。如果提供了 timeout 参数,这里的 timeout 参数优先于创建栅栏对象时提供的 timeout 参数。
函数返回值是一个整数,取值范围在0到 parties – 1,在每个线程中的返回值不相同。可用于从所有线程中选择唯一的一个线程执行一些特别的工作。例如:
i = barrier.wait()
if i == 0:# Only one thread needs to print thisprint("passed the barrier")
如果创建栅栏对象时在构造函数中提供了 action 参数,它将在其中一个线程释放前被调用。如果此调用引发了异常,栅栏对象将进入损坏态。
如果发生了超时,栅栏对象将进入破损态。
如果栅栏对象进入破损态,或重置栅栏时仍有线程等待释放,将会引发 BrokenBarrierError 异常。
reset()
重置栅栏为默认的初始态。如果栅栏中仍有线程等待释放,这些线程将会收到 BrokenBarrierError 异常。
请注意使用此函数时,如果存在状态未知的其他线程,则可能需要执行外部同步。 如果栅栏已损坏则最好将其废弃并新建一个。
abort()
使栅栏处于损坏状态。 这将导致任何现有和未来对 wait() 的调用失败并引发 BrokenBarrierError。 例如可以在需要中止某个线程时使用此方法,以避免应用程序的死锁。
更好的方式是:创建栅栏时提供一个合理的超时时间,来自动避免某个线程出错。
parties
冲出栅栏所需要的线程数量。
n_waiting
当前时刻正在栅栏中阻塞的线程数量。
broken
一个布尔值,值为 True 表明栅栏为破损态。
exception threading.BrokenBarrierError
异常类,是 RuntimeError 异常的子类,在 Barrier 对象重置时仍有线程阻塞时和对象进入破损态时被引发。
local类
local是一个小写字母开头的类,用于管理 thread-local(线程局部的)数据。对于同一个local,线程无法访问其他线程设置的属性;线程设置的属性不会被其他线程设置的同名属性替换。
可以把local看成是一个“线程-属性字典”的字典,local封装了从自身使用线程作为 key检索对应的属性字典、再使用属性名作为key检索属性值的细节。
# encoding: UTF-8
import threadinglocal = threading.local()
local.tname = 'main'def func():local.tname = 'notmain'print local.tnamet1 = threading.Thread(target=func)
t1.start()
t1.join()print local.tname
notmain
main
在 with 语句中使用锁、条件和信号量
这个模块提供的带有 acquire() 和 release() 方法的对象,可以被用作 with 语句的上下文管理器。当进入语句块时 acquire() 方法会被调用,退出语句块时 release() 会被调用。因此,以下片段:
with some_lock:# do something...
相当于:
some_lock.acquire()
try:# do something...
finally:some_lock.release()
现在 Lock 、 RLock 、 Condition 、 Semaphore 和 BoundedSemaphore 对象可以用作 with 语句的上下文管理器。