18.2 线程和进程
18.2.1 什么是进程?
18.2.1 什么是进程?
计算机程序只不过是磁盘中可执行的,二进制的数据。它们只有在被读取到内存中,被操作系统调用的时候才开始它们的生命周期。进程(重量级进程)是程序的一 次执行,每个进程都有自己的地址空间,内存,数据栈以及其它记录其运行轨迹的辅助数据。操作系统管理在其上运行的所有进程,并为这些进程公平的分配时间。 进程也可以通过fork和spawn操作来完成其它的任务。不过各个进程有自己的内存空间,数据栈等,所以只能使用进程间通讯(IPC),而不能直接共享 信息。
18.2.2 什么是线程
线程(轻量级进程)跟进程有些相似,不同的是:所有的线程运行在同一个进程中,共享相同的运行环境。它们可以想象成是在主进程或“主线程”中并行运行的“迷你进程”。
线程有开始,顺序执行和结束三部分。它有一个自己的指令指针,记录自己运行到什么地方。线程的运行可能被抢占(中断),或暂时的被挂起(也叫睡眠),让其 它的线程运行,这叫做让步。一个进程中的各个线程之间共享同一片数据空间,所以线程之间可以比进程之间更方便的共享数据以及相互通讯。线程一般都是并发执 行的,正式由于这种并行和数据共享的机制使得多个任务的合作变成可能。实际上,在单CPU的系统中,真正的并发是不可能的,每个线程会被安排成每次只运行 一会,然后就把CPU让出来,让其它的线程去运行。在进程的整个运行过程中,每个线程都只做自己的事,在需要的时候跟其它的线程共享运行的结果。
当然,这样的共享并不是完全没有危险的。如果多个线程共同访问同一片数据,则由于数据访问的顺序不一样,有可能导致数据结果的不一致问题。
另一个需要注意的地方是:由于有的函数会在完成之前阻塞住,在没有特别为多线程做修改的情况下,这种“贪婪”的函数会让CPU的市价分配有所倾斜。导致各个线程分配到的运行时间可能不尽相同,不尽公平。
18.3.5 Python的threading模块
核心提示:避免使用thread模块。更高级别的 threading 模块更为先进,对线程的支持更为完善,而且使用 thread 模块里的属性有可能会与 threading 出现冲突。其次,低级别的 thread 模块的同步原语很少(实际上只有一个),而 threading 模块则有很多。
thread 模块函数
start_new_thread(function,
args, kwargs=None) 产生一个新的线程,在新线程中用指定的参数和可选的
kwargs来调用这个函数。
allocate_lock() 分配一个LockType 类型的锁对象
exit() 让线程退出
LockType类型锁对象方法
acquire(wait=None) 尝试获取锁对象
locked() 如果获取了锁对象返回True,否则返回False
release() 释放锁
例子1:简单的多线程
1 from time import sleep, ctime 2 import thread 3 def loop0(): 4 print "start loop 0 at:", ctime() 5 sleep(4) 6 print "loop 0 done at:", ctime() 7 def loop1(): 8 print "start loop 1 at:", ctime() 9 sleep(2) 10 print "loop 1 done at:", ctime() 11 def main(): 12 print "starting at:", ctime() 13 thread.start_new_thread(loop0, ()) 14 thread.start_new_thread(loop1, ()) 15 sleep(6) 16 print "all DONE at:", ctime() 17 18 if __name__ == "__main__": 19 main()
其中sleep(6)是为了让线程中的sleep(2)和sleep(4)能够完整运行(而不是随着主线程结束直接终止)。
我们可以通过使用锁来保证主线程不会提前结束。
1 #!/usr/bin/env python 2 3 import thread 4 from time import sleep, ctime 5 6 loops = [4,2] 7 8 def loop(nloop, nsec, lock): 9 print 'start loop', nloop, 'at: ', ctime() #运行时这条语句有错误输出 10 sleep(nsec) 11 print 'loop', nloop, 'done at: ', ctime() 12 lock.release() 13 14 def main(): 15 print 'starting at:', ctime() 16 locks = [] 17 nloops = range(len(loops)) 18 19 for i in nloops: 20 lock = thread.allocate_lock() 21 lock.acquire() #尝试获得锁(将锁锁上) 22 locks.append(lock) 23 for i in nloops: 24 thread.start_new_thread(loop, (i, loops[i], locks[i])) 25 26 for i in nloops: 27 while locks[i].locked(): 28 pass 29 print 'all DONE at:', ctime() 30 main()
18.5 threading 模块
接下来,我们要介绍的是更高级别的threading模块,它不仅提供了Thread类,还提供了各种好用的同步机制。
threading 模块对象 描述
Thread 表示一个线程的执行的对象
Lock 锁原语对象(跟thread 模块里的锁对象相同)
RLock 可重入锁对象。使单线程可以再次获得已经获得了的锁(递归锁定)。
Condition 条件变量对象能让一个线程停下来,等待其它线程满足了某个“条件”。
如,状态的改变或值的改变。
Event 通用的条件变量。多个线程可以等待某个事件的发生,在事件发生后,
所有的线程都会被激活。
Semaphore 为等待锁的线程提供一个类似“等候室”的结构
BoundedSemaphore 与Semaphore 类似,只是它不允许超过初始值
Timer 与Thread 相似,只是,它要等待一段时间后才开始运行。
核心提示:守护线程
另一个避免使用thread 模块的原因是,它不支持守护线程。当主线程退出时,所有的子线程不论它们是否还在工作,都会被强行退出。有时,我们并不期望这种行为,这时,就引入了守护线程的概念
threading 模块支持守护线程,它们是这样工作的:守护线程一般是一个等待客户请求的服务器,如果没有客户提出请求,它就在那等着。如果你设定一个线程为守护线程,就 表示你在说这个线程是不重要的,在进程退出的时候,不用等待这个线程退出。就像你在第16 章网络编程看到的,服务器线程运行在一个无限循环中,一般不会退出。
如果你的主线程要退出的时候,不用等待那些子线程完成,那就设定这些线程的daemon 属性。即,在线程开始(调用thread.start())之前,调用setDaemon()函数设定线程的daemon 标志(thread.setDaemon(True))就表示这个线程“不重要”
如果你想要等待子线程完成再退出, 那就什么都不用做, 或者显式地调用thread.setDaemon(False)以保证其daemon 标志为False。你可以调用thread.isDaemon()函数来判断其daemon 标志的值。新的子线程会继承其父线程的daemon 标志。整个Python 会在所有的非守护线程退出后才会结束,即进程中没有非守护线程存在的时候才结束。
18.5.1Thread类
函数 描述
start() 开始线程的执行
run() 定义线程的功能的函数(一般会被子类重写)
join(timeout=None) 程序挂起,直到线程结束;如果给了timeout,则最多阻塞timeout 秒
getName() 返回线程的名字
setName(name) 设置线程的名字
isAlive() 布尔标志,表示这个线程是否还在运行中
isDaemon() 返回线程的daemon 标志
setDaemon(daemonic) 把线程的daemon 标志设为daemonic(一定要在调用start()函数前调用)
有三种方法用Thread类创建线程。
1. 创建一个Thread的实例,传给它一个函数
2. 创建一个Thread的实例,传给它一个可调用的类对象
3. 从Thread派生出一个子类, 创建一个这个子类的实例
下面分别给出例子
1.创建一个Thread的实例,传给它一个函数
1 #!/usr/bin/env python 2 3 import threading 4 from time import sleep, ctime 5 6 loops = [4,2] 7 8 def loop(nloop, nsec): 9 print 'start loop', nloop, 'at:', ctime() 10 sleep(nsec) 11 print 'loop', nloop, 'done at:', ctime() 12 13 def main(): 14 print 'starting at:', ctime() 15 threads = [] 16 nloops = range(len(loops)) 17 18 for i in nloops: 19 t = threading.Thread(target = loop, args = (i, loops[i])) 20 threads.append(t) 21 22 for i in nloops: 23 threads[i].start() 24 25 for i in nloops: 26 threads[i].join() 27 28 print 'all DONE at:', ctime() 29 30 if __name__ == '__main__': 31 main()
所有线程都创建了之后,再一起调用start()函数启动,而不是创建一个启动一个,而且,不用再管理一堆锁(分配锁,获得锁,释放锁,检查锁的状态),只要简单地对每个线程调用join()函数就可以了。
join()会等到线程结束,或者在给力timeout参数的情况下等到超时,使用join()会比使用一个等待锁释放的无限循环更清晰(这种锁也被称为自旋锁)。
join()的另一个比较重要的方面是它可以完全不被调用。事实上一旦线程启动以后就会一直运行,直到线程的函数结束。如果你的主线程除了等线程结束以外还有其他的事情要做,那就不用调用join(),只有当你要等待线程结束的时候才调用join()。
2.创建一个Thread的实例,传给它一个可调用的类对象。
1 #!/usr/bin/env python 2 3 import threading 4 from time import sleep, ctime 5 6 loops = [4,2] 7 8 class ThreadFunc(object): 9 10 def __init__(self, func, args, name=''): 11 self.name = name 12 self.func = func 13 self.args = args 14 15 def __call__(self): 16 self.res = self.func(*self.args) 17 18 def loop(nloop, nsec): 19 print 'start loop', nloop, 'at:', ctime() 20 sleep(nsec) 21 print 'loop', nloop, 'done at:', ctime() 22 23 def main(): 24 print 'starting at:', ctime() 25 threads = [] 26 nloops = range(len(loops)) 27 28 for i in nloops: 29 t = threading.Thread(target = ThreadFunc(loop, (i,loops[i]), 30 loop.__name__)) 31 threads.append(t) 32 33 for i in nloops: 34 threads[i].start() 35 36 for i in nloops: 37 threads[i].join() 38 39 print 'all DONE at:', ctime() 40 41 if __name__ == '__main__': 42 main() 43
3.从Thread类中派生出一个子类,创建一个这个子类的实例
mtsleep5和mtsleep4的最大区别在于1.MyThread子类的构造器一定要先调用其基类的构造器 2.之前的特殊函数__call__()在子类中,名字要改为run()。
1 #!/usr/bin/env python 2 3 import threading 4 from time import sleep, ctime 5 6 loops = (4, 2) 7 8 class MyThread(threading.Thread): 9 def __init__(self, func, args, name=''): 10 threading.Thread.__init__(self) 11 self.name = name 12 self.func = func 13 self.args = args 14 15 def run(self): 16 self.func(*self.args) #apply(self.func, self.args) 17 18 def loop(nloop, nsec): 19 print 'start loop', nloop, 'at:', ctime() 20 sleep(nsec) 21 print 'loop', nloop, 'done at:', ctime() 22 23 def main(): 24 print 'starting at:', ctime() 25 threads = [] 26 nloops = range(len(loops)) 27 28 for i in nloops: 29 t = MyThread(loop, (i, loops[i]), loop.__name__) 30 threads.append(t) 31 32 for i in nloops: 33 threads[i].start() 34 35 for i in nloops: 36 threads[i].join() 37 38 print 'all DONE at:', ctime() 39 40 if __name__ == '__main__': 41 main()
为了让mtsleep5中的Thread的子类更为通用我们把子类单独放在一个模块中,并加上一个getResult()函数用以返回函数的运行结果
1 #!/usr/bin/eny python 2 3 import threading 4 from time import ctime 5 6 class MyThread(threading.Thread): 7 def __init__(self, func, args, name=''): 8 threading.Thread.__init__(self) 9 self.name = name 10 self.func = func 11 self.args = args 12 13 def getResult(self): 14 return self.res 15 16 def run(self): 17 print 'starting', self.name, 'at:'. ctime() 18 self.res = self.func(*self.args) 19 print self.name, 'finished at:', ctime()
接下来给出一个脚本比较递归求斐波那契、阶乘和累加和函数的运行。脚本先在单线程中运行再在多线程中运行以说明多线程的好处。
1 #!/usr/bin/eny python 2 3 from myThread import MyThread 4 from time import ctime, sleep 5 6 def fib(x): 7 sleep(0.005) 8 if x < 2: return 1 9 return (fib(x-2) + fib(x-1)) 10 11 def fac(x): 12 sleep(0.1) 13 if x < 2:return 1 14 return (x * fac(x-1)) 15 16 def sum(x): 17 sleep(0.1) 18 if x < 2:return 1 19 return (x + sum(x - 1)) 20 21 funcs = [fib, fac, sum] 22 n = 12 23 24 def main(): 25 nfuncs = range(len(funcs)) 26 27 print '*** SINGLE THREAD' 28 for i in nfuncs: 29 print 'starting', funcs[i].__name__, 'at:', ctime() 30 print funcs[i](n) 31 print funcs[i].__name__, 'finished at:', ctime() 32 33 print '\n*** MUTIPLE THREADS' 34 threads = [] 35 for i in nfuncs: 36 t = MyThread(funcs[i],(n,), funcs[i].__name__) 37 threads.append(t) 38 39 for i in nfuncs: 40 threads[i].start() 41 42 for i in nfuncs: 43 threads[i].join() 44 print threads[i].getResult() 45 46 print 'all DONE' 47 48 if __name__ == '__main__': 49 main()
18.5.3threading模块中的其他函数
函数 | 描述 |
activeCount() | 当前活动的线程对象的数量 |
crrentThread() | 返回当前线程对象 |
enumerate() | 返回当前活动线程的列表 |
settrace(func) | 为所有线程设置一个跟踪函数 |
setprofile(func) | 为所有线程设置一个profile函数 |
18.5.4 生产者-消费者问题和Queue模块
生产者-消费者问题,就是生产者把生产的货物放进队列一类的数据结构中供消费者使用,其中生产货物和消费货物的时间都是不固定的。
Queue模块可以用来解决生产者-消费者问题,让各个线程之间通信,所用到的属性如下:
函数 | 描述 |
Queue模块函数 | |
queue(size) | 创建一个大小为size的Queue对象 |
Queue对象函数 | |
qsize() | 返回队列的大小(由于在返回的时候,队列可能会被其他线程修改,所以这个值是近似值) |
empty() | 如果队列为空返回True, 否则返回False |
full() | 如果队列已满返回True,否则返回False |
put(item, block=0) | 把item放到队列中,如果给了block,函数会一直阻塞到队列中有空间为止 |
get(block=0) | 从队列中取一个对象,如果给了block,函数会一直阻塞直到队列中有对象为止。 |
1 #!/usr/bin/env python 2 3 from random import randint 4 from time import sleep 5 from Queue import Queue 6 from myThread import MyThread 7 8 def writeQ(queue): 9 print 'producing object for Q...' 10 queue.put('xxx', 1) 11 print "size now", queue.qsize() 12 13 def readQ(queue): 14 val = queue.get(1) 15 print 'consumed object from Q... size now', queue.qsize() 16 17 def writer(queue, loops): 18 for i in range(loops): 19 writeQ(queue) 20 sleep(randint(1, 3)) 21 22 def reader(queue, loops): 23 for i in range(loops): 24 readQ(queue) 25 sleep(randint(1, 3)) 26 27 funcs = [writer, reader] 28 nfuncs = range(len(funcs)) 29 30 def main(): 31 nloops = randint(2, 5) 32 q = Queue(32) 33 34 threads = [] 35 for i in nfuncs: 36 t = MyThread(funcs[i], (q, nloops), funcs[i].__name__) 37 threads.append(t) 38 39 for i in nfuncs: 40 threads[i].start() 41 42 for i in nfuncs: 43 threads[i].join() 44 45 print 'all DONE' 46 47 if __name__ == '__main__': 48 main()
18.6相关模块
模块 | 描述 |
thread | 基本的、低级别的线程模块 |
threading | 高级别的线程和同步对象 |
Queue | 供多线程使用的同步队列 |
mutex | 互斥对象 |
SocketServer | 具有线程控制的TCP和UDP管理器 |