听到一些关于python多进程与多线的例子,感觉比较经典,把一些例子分享一下.
内容如下:
Process、Thread、GIL、Process fork、Multiprocessing、Queue、ProcessPool、Multiprocess-Multithread comparison
(1) Process : 程序的一次执行(程序编写完毕后代码装载入内存,系统分配资源运行)。每个进程有自己的内存空间、数据栈等,只能使用进 程间通讯,而不能直接共享信息
(2) Thread线程:所有线程运行在同一个进程中,共享相同的运行环境。 每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口; 线程的运行可以被抢占(中断),或暂时被挂起 (睡眠),让其他线程运行(让步);一个进程中的各个线程间共享同一片数据空间
(3) 全局解释器锁GIL
GIL全称全局解释器锁Global Interpreter Lock,GIL并不是Python的特性,它是在实现Python解析器(CPython)时 所引入的一个概念。
GIL是一把全局排他锁,同一时刻只有一个线程在运行。 毫无疑问全局锁的存在会对多线程的效率有不小影响。甚至就几乎等于Python是个单线程的程序。
multiprocessing库的出现很大程度上是为了弥补thread库因为 GIL而低效的缺陷。它完整的复制了一套thread所提供的接口方便迁移。唯一的不同就是它使用了多进程而不是多线程。每个进程有自己的独立的GIL,因此也不会出现进程之间的GIL争抢。
多线程处理的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | from threading import Thread import time def my_counter(): i = 0 for _ in range ( 100000000 ): i = i + 1 return True def main(): thread_array = {} start_time = time.time() for tid in range ( 2 ): t = Thread(target = my_counter()) t.start() t.join() #以单线程、阻塞的方式顺序运行两次my_counter函数 end_time = time.time() print ( "Total time:{}" ). format (end_time - start_time) if __name__ = = "__main__" : main() |
执行结果如下:
Total time:12.7875118256
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | from threading import Thread import time def my_counter(): i = 0 for _ in range ( 100000000 ): i = i + 1 return True def main(): thread_array = {} start_time = time.time() for tid in range ( 2 ): t = Thread(target = my_counter()) t.start() thread_array[tid] = t for i in range ( 2 ): thread_array[i].join() #以多进程、并发的方式运行两次my_counter函数 end_time = time.time() print ( "Total time:{}" ). format (end_time - start_time) if __name__ = = "__main__" : main() |
执行结果如下:
Total time:15.8216409683
上述两个例子的结果发现:单线程运行两次函数的时间 要比 两个线程同时运行函数的时间短(仅限于本例)。
(4) fork操作:
调用一次,返回两次。因为操作系统自动把当前进程(称为父 进程)复制了一份(称为子进程),然后分别在父进程和子进 程内返回。子进程永远返回0,而父进程返回子进程的ID。子 进程只需要调用getppid()就可以拿到父进程的ID。
例:
1 2 3 4 5 6 7 | import os print 'Process (%s) start ...' % os.getpid() pid = os.fork() if pid = = 0 : print 'I am child process (%s) and my parent is (%s)' % (os.getpid(),os.getppid()) else : print 'I (%s) just created a child process (%s).' % (os.getpid(),pid) |
执行结果如下:
Process (16480) start ...
I (16480) just created a child process (16481).
I am child process (16481) and my parent is (16480)
(5) multiprocessing是跨平台版本的多进程模块,它提供了一个Process类来代表一个进程对象,下面是示例代码:
1 2 3 4 5 6 7 8 9 10 11 | from multiprocessing import Process import time def f(n): time.sleep( 1 ) print n * n if __name__ = = "__main__" : for i in range ( 10 ): p = Process(target = f,args = [i,]) p.start() #使用多进程并发的方式执行函数f |
这个程序如果用单进程写则需要执行10秒以上的时间, 而用多进程则启动10个进程并行执行,只需要用1秒多的时间。多进程时,每个进程各自有各自的GIL。而同一进程中的多线程受到GIL的影响,效率返而会下降。
(6) Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | from multiprocessing import Process,Queue import time def write(q): for i in [ 'A' , 'B' , 'C' , 'D' , 'E' ]: print ( 'Put %s to queue' % i) q.put(i) time.sleep( 0.5 ) def read(q): while True : v = q.get( True ) print ( 'get %s from queue' % v) if __name__ = = '__main__' : q = Queue() pw = Process(target = write,args = (q,)) pr = Process(target = read,args = (q,)) pw.start() pr.start() pr.join() pr.terminate() |
输出结果:
Put A to queue
get A from queue
Put B to queue
get B from queue
Put C to queue
get C from queue
Put D to queue
get D from queue
Put E to queue
get E from queue
(7) 进程池pool , 用于批量创建子进程,可以灵活控制子进程的数量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | from multiprocessing import Pool import time def f(x): print x * x time.sleep( 2 ) return x * x if __name__ = = '__main__' : pool = Pool(processes = 5 ) res_list = [] for i in range ( 10 ): res = pool.apply_async(f,[i,]) ''' 以异步并行的方式启动进程处理函数f,如果要同步等待的方式,可以在每次进程启动之后调用res.get()方法,也可以使用Pool.apply''' print ( '-------:' ,i) res_list.append(res) pool.close() pool.join() for r in res_list: print 'result' ,(r.get(timeout = 5 )) |
输出结果如下:
('-------:', 0)
('-------:', 1)
('-------:', 2)
('-------:', 3)
('-------:', 4)
('0-------:', 5)
('-------:', 6)
('-------:', 7)
1
('-------:', 8)
('-------:', 9)
4
16
9
25
36
49
64
81
result 0
result 1
result 4
result 9
result 16
result 25
result 36
result 49
result 64
result 81
如果使用同步方式,处理函数时 必须等待 前一个处理的结束,所以如果将该程序换为同步方式,输出结果则是顺序的。
(8) 多进程与多线程的对比:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | from multiprocessing import Process import threading import time lock = threading.Lock() def run(info_list,n): lock.acquire() info_list.append(n) lock.release() print ( '%s' % info_list) if __name__ = = '__main__' : info = [] for i in range ( 10 ): p = Process(target = run,args = [info,i]) p.start() p.join() time.sleep( 1 ) print ( '-----------------threading--------------' ) for i in range ( 10 ): p = threading.Thread(target = run,args = [info,i]) p.start() p.join() |
输出结果为:
[0]
[1]
[2]
[3]
[4]
[5]
[6]
[7]
[8]
[9]
-----------------threading--------------
[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4, 5]
[0, 1, 2, 3, 4, 5, 6]
[0, 1, 2, 3, 4, 5, 6, 7]
[0, 1, 2, 3, 4, 5, 6, 7, 8]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
多进程间数据不能直接共享,每次处理函数run的结果都不能继承。而多线程间数据可以共享,但受到GIL的影响,run函数中在将数据追加到列表时,使用lock锁,追回完毕再释放lock,这样避免冲突。