Python 中 Multiprocessing 实现进程通信
- 1. 如何建立主进程与子进程之间的通信管道?
- 2. 为什么一定要将Pipe中的某些端close()?
本文参考自:python 学习笔记 - Queue & Pipes,进程间通讯
1. 如何建立主进程与子进程之间的通信管道?
在Python中很多时候需要用到多进程并行编程,由于每个进程都拥有自己的独立内存空间,无法像线程一样通过访问全局变量来共享数据。因此,进程之间的通信比线程通信要更加的复杂。在 Multiprocessing 包中存在 Pipe 类,Pipe(管道)能够实现进程之间更高效的通信,倘若我们现在由一个主进程,主进程创建了一个子进程,那么如何通过建立主进程与子进程之间的通信呢?
- 当主进程创建Pipe的时候,Pipe的两个Connections连接的都是主进程;
- 当主进程创建子进程后,Connections被拷贝了一份,此时一共有2(主进程)+ 2(子进程)= 4 个Connections;
- 随后,我们关闭主进程中的 out_connection 和子进程中的 in_connection 端口,即可建立一条主进程通往子进程的管道了;
NOTE:
in_connection 和 out_connection 中的 in/out 是针对 Pipe 管道来说的,in 是指数据流入Pipe的那条管道。此外,由于 Pipe 对象默认是双向的,因此下图中的箭头管道上的数据流实际上可以是双向的,既能从上往下也能从下往上。
实现代码如下所示:
from multiprocessing import Pipe, Processdef son_process(x, pipe):""" 注意,out在前,in在后 """_out_pipe, _in_pipe = pipe""" 关闭拷贝过来的输入端 """_in_pipe.close()while True:try:msg = _out_pipe.recv()print msgexcept EOFError:""" 当out_pipe接受不到输出的时候且输入被关闭的时候,会抛出EORFError,可以捕获并且退出子进程 """breakif __name__ == '__main__':out_pipe, in_pipe = Pipe(True)son_p = Process(target=son_process, args=(100, (out_pipe, in_pipe)))son_p.start()""" 等pipe被拷贝后,关闭主进程的输出端,这样,创建的Pipe一端连接着主进程的输入,一端连接着子进程的输出口 """out_pipe.close()for x in range(1000):in_pipe.send(x)in_pipe.close()son_p.join()print("主进程结束")
2. 为什么一定要将Pipe中的某些端close()?
由于Pipe之间的通信时通过,in_conn.send()、out_conn.recv() 这种方式进行通信的,因此如果当某一方调用了 .recv() 函数但一直没有另外的端口使用 .send() 方法的话,recv() 函数就会阻塞住。为了避免程序阻塞,我们在明确另一个端口不会再调用 .send() 函数后可以直接将发送端口给 close(),这样以来如果接收端还在继续调用 .recv() 方法的话程序就会抛出 EOFError 的异常,示例代码如下:
from multiprocessing import Process
from multiprocessing import Pipe
import os
import timedef send(out_conn):out_conn.send("hello")""" 发完一次内容后,就将子进程中的 out_conn 给 close(),注意外部(主进程中)的 out_conn 也需要被 close """out_conn.close() def recv(in_conn):try:print(in_conn.recv())print(in_conn.recv()) # 第二次调用 .recv() 会抛出 EOFError 异常,因为 out_conn 已经被关闭了except EOFError:print("end")if __name__ == "__main__":out_conn, in_conn = Pipe(True)p_send = Process(target=send, args=(out_conn,))p_send.start()p_recv = Process(target=recv, args=(in_conn,))p_recv.start()out_conn.close() # 在子进程拷贝完 Pipe Connections 之后,外部的 out_conn 也需要被关闭p_send.join()p_recv.join()