python 多进程
- 多进程和多线程
- 什么是进程
- 什么是线程
- 多进程和多线程的区别与联系
- 优缺点
- 多线程的优点
- 多线程的缺点
- 多进程的优点
- 多进程的缺点
- 选择多线程还是多进程?
- 多进程
- 使用multiprocessing.Process创建多进程
- 通过multiprocessing.Process模块创建
- 继承Process类来创建多进程
- 使用 multiprocessing.Pool 创建多进程
- 进程通信
- Queue
- Pipe
参考 https://blog.csdn.net/u010230019/article/details/128455712
多进程和多线程
进程是Python中最小的资源分配单元,进程中间的数据,内存是不共享的,每启动一个进程,都要独立分配资源和拷贝访问的数据,所以进程的启动和销毁的代价比较大。
线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位,一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。 在同一个进程内的线程的数据是可以进行互相访问的,这点区别于多进程。
一个进程至少要包含一个线程,每个进程在启动的时候就会自动的启动一个线程,进程里面的第一个线程就是主线程,每次在进程内创建的子线程都是由主线程进程创建和销毁,子线程也可以由主线程创建出来的线程创建和销毁线程。
Python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源,在Python中大部分情况需要使用多进程。
什么是进程
计算机程序只不过是磁盘中可执行的,二进制(或其它类型)的数据。它们只有在被读取到内 存中,被操作系统调用的时候才开始它们的生命期。进程(有时被称为重量级进程)是程序的一次执行。每个进程都有自己的地址空间,内存,数据栈以及其它记录其运行轨迹的辅助数据。
操作系统管理在其上运行的所有进程,并为这些进程公平地分配时间。进程也可以通过 fork 和 spawn 操作来完成其它的任务。不过各个进程有自己的内存空间,数据栈等,所以只能使用进程间通讯(IPC), 而不能直接共享信息。
什么是线程
线程(有时被称为轻量级进程)跟进程有些相似,不同的是,所有的线程运行在同一个进程中, 共享相同的运行环境。它们可以想像成是在主进程或“主线程”中并行运行的“迷你进程”。
线程有开始,顺序执行和结束三部分。它有一个自己的指令指针,记录自己运行到什么地方。 线程的运行可能被抢占(中断),或暂时的被挂起(也叫睡眠),让其它的线程运行,这叫做让步。 一个进程中的各个线程之间共享同一片数据空间,所以线程之间可以比进程之间更方便地共享数据 以及相互通讯。线程一般都是并发执行的,正是由于这种并行和数据共享的机制使得多个任务的合作变为可能。实际上,在单 CPU 的系统中,真正的并发是不可能的,每个线程会被安排成每次只运行一小会,然后就把 CPU 让出来,让其它的线程去运行。在进程的整个运行过程中,每个线程都只做自己的事,在需要的时候跟其它的线程共享运行的结果。
当然,这样的共享并不是完全没有危险的。如果多个线程共同访问同一片数据,则由于数据访问的顺序不一样,有可能导致数据结果的不一致的问题。这叫做竞态条件(race condition)。幸运的是,大多数线程库都带有一系列的同步原语,来控制线程的执行和数据的访问。
另一个要注意的地方是,由于有的函数会在完成之前阻塞住,在没有特别为多线程做修改的情况下,这种“贪婪”的函数会让 CPU 的时间分配有所倾斜。导致各个线程分配到的运行时间可能不尽相同,不尽公平。
多进程和多线程的区别与联系
- 线程是执行的指令集,进程是资源的集合;
- 线程的启动速度比进程的启动速度要快;
- 两个线程的执行速度是一样的;
- 进程与线程的运行速度是没有可比性的;
- 线程共享创建它的进程的内存空间,进程的内存是独立的;
- 两个线程共享的数据都是同一份数据,而两个子进程的数据不是共享的,数据是独立的;
- 同一个进程的线程之间可以直接交流,同一个主进程的多个子进程之间是不可以进行交流,如果两个进程之间需要通信,就必须要通过一个中间代理来实现;
- 一个新的线程很容易被创建,一个新的进程创建需要对父进程进行一次克隆;
- 一个线程可以控制和操作同一个进程里的其他线程,线程与线程之间没有隶属关系,但是进程只能操作子进程;
- 改变主线程,有可能会影响到其他线程的行为,但是对于父进程的修改是不会影响子进程;
优缺点
多线程的优点
-
轻量级:线程的创建和上下文切换比进程要快得多,占用的资源也比较少。
-
共享内存:多个线程可以共享进程的内存空间,方便数据的传递和共享。
-
适合I/O密集型任务:多线程适合处理I/O密集型任务,如网络爬虫、文件读写等任务。
多线程的缺点
-
GIL限制:Python的全局解释器锁(GIL)限制了同一时刻只能有一个线程执行Python字节码,导致多线程无法利用多核CPU的优势。
-
线程不安全:线程之间共享内存,容易出现竞争条件,需要使用锁机制来保证线程安全。
-
难以调试:多线程程序难以调试,因为线程之间的执行顺序不确定。
多进程的优点
-
多核利用:多进程可以同时利用多个CPU核心,提高程序的运行效率。
-
各进程独立:各个进程之间独立运行,互不干扰,不容易出现竞争条件。
-
隔离性好:每个进程都有独立的内存空间,不会相互影响。
多进程的缺点
-
创建和上下文切换开销大:创建进程的开销比线程大,进程之间的上下文切换也比线程慢。
-
不易共享数据:进程之间不能共享内存,需要使用IPC机制来传递数据。
-
系统资源占用:每个进程都需要占用一定的系统资源,如内存、文件描述符等。
选择多线程还是多进程?
对于多线程:
Python 的多线程库 threading 在某些情况下确实是鸡肋的,这是因为 Python 的全局解释器锁(Global Interpreter Lock, GIL)导致了多线程的并发性能不能真正发挥出来。简单来说,这意味着在任何给定时刻只有一个线程能够真正地运行 Python 代码,这就限制了多线程的性能。
然而,对于一些特定类型的任务,比如 I/O 密集型的任务,多线程还是可以带来性能提升的。这是因为 I/O 操作通常会导致线程阻塞,让其他线程得以运行。此外,在 Python3 中,对于一些特殊情况,比如使用 asyncio 库,也可以通过协程实现并发执行,从而规避 GIL 的限制。
对于多进程:
Python 的多进程库 multiprocessing 是可以真正发挥出多核处理器的性能的,因为每个进程都有自己的解释器和 GIL。这意味着每个进程可以独立地运行 Python 代码,从而实现真正的并行处理。
当然,多进程也有一些缺点,比如进程之间的通信和数据共享比较麻烦。此外,每个进程的启动和销毁都会涉及到一定的开销,因此如果任务很小,多进程可能反而会带来性能下降。
选择多线程还是多进程? 在这个问题上,首先要看下你的程序是属于哪种类型的。一般分为两种:CPU密集型和I/O密集型。
CPU 密集型:程序比较偏重于计算,需要经常使用CPU来运算。例如科学计算的程序,机器学习的程序等。
I/O 密集型:顾名思义就是程序需要频繁进行输入输出操作。爬虫程序就是典型的I/O密集型程序。
如果程序是属于CPU密集型,建议使用多进程。而多线程就更适合应用于I/O密集型程序。
多进程
本文是基于 python2.7.8 编写
使用multiprocessing.Process创建多进程
通过multiprocessing.Process模块创建
multiprocessing
是python的多进程管理包,和threading.Thread
类似。multiprocessing模块可以让程序员在给定的机器上充分地利用CPU。multiprocessing模块提供了一个Process类,可以用来创建和管理进程。我们可以通过直接从 multiprocessing.Process
继承创建一个新的子类,并实例化后调用 start()
方法启动新进程,即相当于它调用了进程的 run()
方法。
创建多进程语法如下:
multiprocessing.Process(group=None, target=function, name=None, args=(), kwargs={}, *, daemon=None)
参数说明:
function - 函数名args - 传递给函数的参数,必须是个tuple类型kwargs - 可选参数name - 给进程设定一个名字,也可以不设定group - 指定进程组,大多数情况下用不到
Process创建的实例对象的常用方法
- start():启动子进程实例(创建子进程)
- is_alive():判断进程子进程是否还在活着
- join([timeout]):是否等待子进程执行结束,或等待多少秒
- terminate():不管任务是否完成,立即终止子进程
Process创建的实例对象的常用属性
- name:当前进程的别名,默认为Process-N,N为从1开始递增的整数
- pid:当前进程的pid(进程号)
示例:
#coding:utf-8
import multiprocessingdef worker():"""该函数将在子进程中执行"""print('Worker')if __name__ == '__main__':# 创建子进程p = multiprocessing.Process(target=worker)# 启动子进程p.start()print p.nameprint p.pid# 等待子进程结束p.join()
-----------------
Process-1
29544
Worker
注意使用多进程的时候需要特别注意,必须要有
if __name__ == '__main__':
, 该语句下的代码相当于主进程,没有该语句会报错。
在 multiprocessing.Process 的源码中是对子进程再次产生子进程是做了限制的,是不允许的,于是便会出现错误。
在上面的代码中,worker函数将在子进程中执行。首先,创建了一个Process对象,指定target参数为worker函数。然后,通过调用start方法启动子进程,最后调用join
方法等待子进程结束。
- daemon:如果当前python进程是守护进程,那么意味着这个进程是“不重要”的,“不重要”意味着如果他的主进程结束了但该守护进程没有运行完,守护进程就会被强制结束。如果进程是非守护进程,那么父进程只有等到非守护进程运行完毕后才能结束。如果需要设置一个进程为守护进程,只需要将其
daemon
参数设置为True
即可。
当守护进程设置为False,主进程会等待子进程结束后才结束#coding:utf-8 import multiprocessing import time def hello():time.sleep(10)print("hello world 1")if __name__ == '__main__':p = multiprocessing.Process(target=hello)# 设置守护进程p.daemon = Truep.start()print("hello world 2")print("hello world 3")print(p.daemon) ------------------- hello world 2 hello world 3 True-------------------
#coding:utf-8 import multiprocessing import time def hello():time.sleep(10)print("hello world 1")if __name__ == '__main__':p = multiprocessing.Process(target=hello)# 设置守护进程p.daemon = Falsep.start()print("hello world 2")print("hello world 3")print(p.daemon) -------------- hello world 2 hello world 3 False hello world 1 --------------
默认daemon为False
- join:join()方法可以在当前位置阻塞主进程,待执行join()的子进程结束后再继续执行主进程的代码逻辑。举个例子来说:
主进程会等待子进程执行完毕后,再执行剩余代码。
当注释掉jion部分,主进程会执行完其程序后结束,并不会等待子进程。#coding:utf-8 from multiprocessing import Process import os import timedef run_proc(name):print("进入子进程【run_proc】, {time.time()}")time.sleep(5)print('Run 【run_proc】child process %s (%s)...' % (name, os.getpid()))def hello_world():print("进入子进程【hello_world】, {time.time()}")# time.sleep(5)time.sleep(10)print('hello world!')print('Run 【hello_world】child process (%s)...' % (os.getpid()))if __name__ == '__main__':print('Parent process %s.' % os.getpid())p1 = Process(target=run_proc, args=('test',))p2 = Process(target=hello_world)print('Process will start.')p1.start()p2.start()p1.join()p2.join()print('Process end.')-------------------------------------------- Parent process 25616. Process will start. 进入子进程【run_proc】, {time.time()} 进入子进程【hello_world】, {time.time()} Run 【run_proc】child process test (28084)... hello world! Run 【hello_world】child process (23784)... Process end.
#coding:utf-8 from multiprocessing import Process import os import timedef run_proc(name):print("进入子进程【run_proc】, {time.time()}")time.sleep(5)print('Run 【run_proc】child process %s (%s)...' % (name, os.getpid()))def hello_world():print("进入子进程【hello_world】, {time.time()}")# time.sleep(5)time.sleep(10)print('hello world!')print('Run 【hello_world】child process (%s)...' % (os.getpid()))if __name__ == '__main__':print('Parent process %s.' % os.getpid())p1 = Process(target=run_proc, args=('test',))p2 = Process(target=hello_world)print('Process will start.')p1.start()p2.start()# p1.join()# p2.join()print('Process end.') -------------------------------------- Parent process 3144. Process will start. Process end. 进入子进程【run_proc】, {time.time()} 进入子进程【hello_world】, {time.time()} Run 【run_proc】child process test (24456)... hello world! Run 【hello_world】child process (15048)...
示例:
#coding:utf-8
import multiprocessing
import timedef worker(num):"""工作函数"""print('Worker:', num)time.sleep(3)if __name__ == '__main__':print("start:")# 创建4个进程并发运行worker函数processes = []for i in range(4):p = multiprocessing.Process(target=worker, args=(i,))processes.append(p)for process in processes:process.start()# 等待所有子进程结束for process in processes:process.join()print("end")
---------------------------
start:
('Worker:', 0)
('Worker:', 1)
('Worker:', 2)
('Worker:', 3)
end
daemon和join()还是需要细细品,daemon如果设置为False,则主进程会等待子进程执行结束后才结束,但实际在执行子进程的时候,主进程的代码已经执行完毕。
而join()会设置阻塞点,在此位置,当所有子进程结束后,主进程才会把阻塞点后面的代码进行执行直至完毕。
继承Process类来创建多进程
同继承Thread类创建多线程类似,多进程也可以通过继承Process类的方式来创建。
继承Process类来创建多进程的方式需要重写Process中的run()方法,run方法中是该线程要执行的动作。
#coding:utf-8
import multiprocessingclass MyProcess(multiprocessing.Process):def __init__(self, name):super(MyProcess, self).__init__()self.name = namedef run(self):print('Hello from process', self.name)if __name__ == '__main__':print("start")p1 = MyProcess("process_1")p2 = MyProcess("process_2")p1.start()p2.start()p1.join()p2.join()print("end")
---------------------------------
start
('Hello from process', 'process_1')
('Hello from process', 'process_2')
end
使用 multiprocessing.Pool 创建多进程
进程池:Pool 可以提供指定数量的进程,供用户调用,当有新的请求提交到 pool 中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程。
Pool和Process的主要区别:
- Process需要自己管理进程,起一个Process就是起一个新进程;
- Pool是进程池,它可以开启固定数量的进程,然后将任务放到一个池子里,系统来调度多进程执行池子里的任务;
使用场景
-
Process适用的场景:
如果目标少且不用控制进程数量则可以用Process类 -
Pool适用的场景:
- 可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;
- 但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行它。
- Pool类用于需要执行的目标很多,而手动限制进程数量又太繁琐时
该模块常用方法:
Pool(numprocess, initializer, initargs)
- numprocess 是要创建的进程数。如果省略此参数,将使用 cpu_count()的值。cpu是4核,就创建有4个进程的进程池,8核就创建8个进程的进程池
- Initializer是每个工作进程启动时要执行的可调用对象。 Initargs 是要传递给 initializer 的参数元组。
- Initializer 默认为 None
Pool有以下方法可以调用:
apply(不推荐):同步阻塞执行,上一个子进程结束后才能进行下一个子进程apply_async
(推荐):各个进程执行顺序不确定异步非阻塞执行,每个子进程都是异步执行的(并行)
使用非阻塞的方式调用 func(任务并行执行),args 为传递给 func 的参数列表,kwds 为传递给 func 的关键字参数列表。apply_async(func[, args[, kwds[, callback[, error_callback]]]])
map:(不推荐)同步阻塞- map_async(推荐):各个进程执行顺序确定,异步非阻塞
- imap:内存不够用可以采用此种方式
- imap_unordered:imap()的无序版本(不会按照调用顺序返回,而是按照结束顺序返回),返回迭代器实例
- starmap:可以使子进程活动接收多个参数,而map只能接收一个参数
- starmap_async
- terminate
不管任务是否完成,立即终止terminate()
- close
关闭Pool,使其不再接受新的任务。close()
- join
主进程阻塞,等待子进程的退出,必须在 close 或 terminate 之后使用。join()
apply_async示例
#coding:utf-8
import multiprocessingdef worker(num):"""工作函数"""print('Worker:', num)if __name__ == '__main__':print("start")# 创建一个包含4个进程的池子pool = multiprocessing.Pool(processes=4)# 向池子提交任务for i in range(10000):pool.apply_async(worker, args=(i,))# 关闭池子,不再接收新的任务pool.close()# 等待所有任务完成并结束进程pool.join()print("end")
------------------------------------
start
('Worker:', 0)
('Worker:', 1)
...
('Worker
(:', 9892)
('Worker:'(('Work','eWo r:',rWorke kerr:',:' 9989, 893)
1)
('Worker:', 98969895('W))
orker:'
9894)
('Wor(')ker:'Wo
, 9997)
('Workerker:'r:, '9998), 9999)end
这段代码会创建一个包含4个进程的池子(通过设置processes参数),然后将任务分配到每个进程上运行。最后,我们需要调用pool.close()来关闭池子,确保没有新的任务被添加;同时,还需要调用pool.join()来等待所有任务完成并结束进程。
#coding:utf-8
import multiprocessingdef worker(num):"""工作函数"""print('Worker:', num)if __name__ == '__main__':# 获取CPU的核心数量cores = multiprocessing.cpu_count()# 创建进程池对象pool = multiprocessing.Pool(processes=cores)# 提交任务到进程池并等待结果返回results = [pool.apply_async(worker, args=(i,)) for i in range(10000)]# 关闭进程池(不再接收新的任务)pool.close()# 等待所有子进程完成任务后才会继续主进程的运行pool.join()# 输出每个子进程的结果for result in results:print('Result:', result.get())
----------------------------------
('Worker:', 0)
('Worker:', 1)
('Worker:', 2)
('Worker:', 3)
...
('Result:', None)
('Result:', None)
('Result:', None)
这段代码首先通过multiprocessing.cpu_count()获取当前系统的CPU核心数量,然后利用该值创建了一个进程池对象pool。之后,我们将需要处理的任务分配给进程池,并使用apply_async()方法提交任务。最后,调用close()方法关闭进程池,并使用join()方法等待所有子进程完成任务后才能继续主进程的运行。
进程通信
进程之间不共享数据的。如果进程之间需要进行通信,则要用到 Queue 模块或者 Pipe 模块来实现。
Queue
Queue 模块中最常用的方法就是 put
以及 get
方法了,一个是将数据放入队列
中,一个是将数据从队列中取出。其他诸如 empty(),full() 等方法由于多线程或多进程的环境,这些方法是不可靠的,就不进行介绍了。
方法:
-
Queue
multiprocessing.Queue([maxsize])
- maxsize 参数可选,如果填入了参数,则申请一个 maxsize 大小的队列。
-
put
put(obj[, block[, timeout]])
将 obj 放入队列。如果可选参数 block 是 True (默认值) 而且 timeout 是 None (默认值), 将会阻塞当前进程,直到有空的缓冲槽。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的缓冲槽时抛出 queue.Full 异常。反之 (block 是 False 时),仅当有可用缓冲槽时才放入对象,否则抛出 queue.Full 异常 (在这种情形下 timeout 参数会被忽略)。
-
get
get([block[, timeout]])
从队列中取出并返回对象。如果可选参数 block 是 True (默认值) 而且 timeout 是 None (默认值), 将会阻塞当前进程,直到队列中出现可用的对象。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的对象时抛出 queue.Empty 异常。反之 (block 是 False 时),仅当有可用对象能够取出时返回,否则抛出 queue.Empty 异常 (在这种情形下 timeout 参数会被忽略)。
示例:
有两个进程,一个进程会产生一个随机数,另一个进程需要将上一个进程产生的随机数进行加1操作,我想大部分人都是想的是用全局变量来进行操作,操作如下:
#coding:utf-8
from multiprocessing import Process
import randomdata = 0
def process1():global datadata = random.random()print('产生的data = {}'.format(data))def process2():global datadata = data + 1print('加一后的data = {}'.format(data))if __name__ == '__main__':new_pro1 = Process(target=process1)new_pro1.start()new_pro2 = Process(target=process2)new_pro2.start()
------------------------
产生的data = 0.685302964185
加一后的data = 1
很明显没有达到想要的目的。那么,为什么会这样呢?那是因为每个子进程享有独立的内存空间,接收进程产生的数据不能马上同步到转发进程中。
改为进程通信
#coding:utf-8
from multiprocessing import Process, Queue
import randomdef process1(q):data = random.random()# 将数据放入队列中q.put(data)print('产生的data = {}'.format(data))def process2(q):# 从队列中得到数据data = q.get()data = data + 1print('加一后的data = {}'.format(data))if __name__ == '__main__':# 初始化队列queue = Queue()new_pro1 = Process(target=process1, args=(queue,))new_pro1.start()new_pro2 = Process(target=process2, args=(queue,))new_pro2.start()
-----------------------------
产生的data = 0.265636608983
加一后的data = 1.26563660898
Pipe
如果你创建了很多个子进程,那么其中任何一个子进程都可以对Queue进行存(put)和取(get)。但Pipe不一样,Pipe只提供两个端点,只允许两个子进程进行存(send)和取(recv)。也就是说,Pipe实现了两个子进程之间的通信。
将上面的例子使用 Pipe 进行改动后,程序如下:
#coding:utf-8
from multiprocessing import Process, Pipe
import randomdef process1(conn_1):data = random.random()# 将数据放入管道的一端conn_1.send(data)print('产生的data = {}'.format(data))def process2(conn_2):# 从管道另一端得到数据data = conn_2.recv()data = data + 1print('加一后的data = {}'.format(data))if __name__ == '__main__':# 初始化管道的两端conn_1, conn_2 = Pipe()new_pro1 = Process(target=process1, args=(conn_1,))new_pro1.start()new_pro2 = Process(target=process2, args=(conn_2,))new_pro2.start()
-----------------------------
产生的data = 0.657057237619
加一后的data = 1.65705723762
如果程序是属于CPU密集型,建议使用多进程。