创建进程
os.fork()
该方法只能在linux和mac os中使用,因为其主要基于系统的fork来实现。window中没有这个方法。
通过os.fork()方法会创建一个子进程,子进程的程序集为该语句下方的所有语句。
import osprint("主进程的PID为:" , os.getpid())w = 1pid = os.fork() # 创建子进程print('fork方法的返回值为: ', pid)if pid == 0:print(f'子进程PID: {os.getpid()}, 主进程PID: {os.getppid()}, 子进程中w: {w}')else:print(f'主进程PID: {os.getpid()}, 主进程PID: {os.getppid()}, 子进程中w: {w}')
multiprocessing(target为函数)
通过multiprocessing模块中的Process类创建一个进程实例对象,并通过其start方法启动该进程。
进程中的程序集为Process类的target参数,可以是一个函数也可以是一个方法。
需要注意的是windows系统中创建进程的过程需要放在if __name__== "__main__"
代码块中,因为其实现数据集的复制时,是通过import语句实现。
而在Linux和MacOS系统下则不需要,因为他们原生支持fork方法。
import multiprocessingimport osimport timedef task():for i in range(3):print("wating... 当前pid为 : ", os.getpid(), "父进程为: ", os.getppid())time.sleep(1)if __name__ == "__main__":print('主进程pid:', os.getpid())process = multiprocessing.Process(target=task)process.start()
multiprocessing(taget为方法)
创建Process实例对象的target参数,不仅可以是函数名,还可以是类的方法名。
- 如果需要往target中传入参数,可以通过args和kwargs两个参数进行相应的传参
import multiprocessingimport timeclass Tasks():def task1(self):time.sleep(1)print('task1')def task2(self):time.sleep(1)print('task2')if __name__ == "__main__":t = Tasks()process1 = multiprocessing.Process(target=t.task1)process1.start()process2 = multiprocessing.Process(target=t.task2)process2.start()
继承Process类
通过继承multiprocessing.Process
类,并重写其中的run方法。
- 必须要重写
run
方法,Process子类的实例对象的run方法,就是进程执行的程序
import timefrom multiprocessing import Processclass MyProcess(Process):def __init__(self, i):super().__init__()self.name = str(i)def run(self):time.sleep(2)print(f'子进程-{self.name}')if __name__ == '__main__':for i in range(5):p = MyProcess(i)p.start()print('主进程')
进程阻塞
目前系统一般都是多核的,当处理多任务时,一般都以并发或并行的方式处理多任务。所以系统一般以异步的方式处理多进程。
在Process
的实例方法中,通过join
方法表示进程阻塞时,主将处于等待状态,并不会处理其他进程。
单进程阻塞
针对每个进程开启后立马启用join方法,这种方法效率低下。使得系统的处理方式编程同步阻塞
,使得主进程依次处理子进程。
import timefrom multiprocessing import Processdef eat():time.sleep(2)print('eat')def drink():time.sleep(2)print('drink')if __name__ == '__main__':process1 = Process(target=eat)process1.start()process1.join()process2 = Process(target=drink)process2.start()process2.join()print('主进程')
多进程阻塞
先利用start方法将多个进程同时创建并启动,然后在创建完成后统一阻塞进程。
- 统一创建进程,并让其统一运行
- 统一等待进程结束,避免每个进程都等一段时间
import timefrom multiprocessing import Processdef eat():time.sleep(2)print('eat')def drink():time.sleep(2)print('drink')if __name__ == '__main__':process1 = Process(target=eat)process1.start()process2 = Process(target=drink)process2.start()for p in [process1, process2]:p.join()print('主进程')
进程锁
当多进程编辑同一文件或数据时,往往会导致数据不一致问题,针对这种情况,需要在进程中对处理文件或数据的代码前后进行加锁和解锁操作。
如果没有锁,会导致数据的不一致
import time, jsonfrom multiprocessing import Processdef read_ticket(user):with open('ticket.txt') as f:num = json.load(f)['ticket']time.sleep(1)print(f'User {user}: 当前剩余{num}张票')return numdef order_ticket(user, num):time.sleep(1)num -= 1with open('ticket.txt', 'w') as f:json.dump({'ticket': num}, f)print(f'User {user}: 购票成功')def ticket(user):num = read_ticket(user)if num > 0:order_ticket(user, num)else:print(f'User {user}: 购票失败')if __name__ == '__main__':queue = []for i in range(5):p = Process(target=ticket, args=(i,))p.start()queue.append(p)for q in queue:q.join()print('运行结束')
加锁/解锁
在编辑数据的之前通过acquire
方法加锁,当数据编辑完成后,通过release
方法解锁。
- 在主进程中创建一个锁对象
- 然后在每个修改共同数据的进程中传入已经创建的锁对象
- 在修改数据的代码前后分别加锁和解锁
"""@Time: 2024/6/28 20:18@Author: 'Ethan'@Email: ethanzhou4406@outlook.com@File: 1. 同步阻塞.py@Project: python@Feature:"""import time, jsonfrom multiprocessing import Process, Lockdef read_ticket(user):with open('ticket.txt') as f:num = json.load(f)['ticket']time.sleep(0.1)print(f'User {user}: 当前剩余{num}张票')def order_ticket(user):time.sleep(0.1)with open('ticket.txt') as f:num = json.load(f)['ticket']if num > 0:with open('ticket.txt', 'w') as f:num -= 1json.dump({'ticket': num}, f)print(f'User {user}: 购票成功')else:print(f'User {user}: 购票失败')def ticket(user,lock):read_ticket(user)lock.acquire()order_ticket(user)lock.release()if __name__ == '__main__':lock = Lock()queue = []for i in range(5):p = Process(target=ticket, args=(i, lock))p.start()queue.append(p)for q in queue:q.join()print('运行结束')
锁的上下文管理器
如果在代码加锁后,解锁前,代码出现了异常就会导致进程没有来得及解锁,而导致死锁现象。通过锁的上下文管理器语法,可以有效避免这种情况的发生。
import time, jsonfrom multiprocessing import Process, Lockdef read_ticket(user):with open('ticket.txt') as f:num = json.load(f)['ticket']time.sleep(0.1)print(f'User {user}: 当前剩余{num}张票')def order_ticket(user):time.sleep(0.1)with open('ticket.txt') as f:num = json.load(f)['ticket']if num > 0:with open('ticket.txt', 'w') as f:num -= 1json.dump({'ticket': num}, f)print(f'User {user}: 购票成功')else:print(f'User {user}: 购票失败')def ticket(user,lock):read_ticket(user)with lock:order_ticket(user)if __name__ == '__main__':lock = Lock()queue = []for i in range(5):p = Process(target=ticket, args=(i, lock))p.start()queue.append(p)for q in queue:q.join()print('运行结束')
进程间通信
进程之间可以进行通信,主要是通过各个进程之中传入一个公共的沟通工具,所有的进程都通过这个工具进行沟通。multiprocessing中提供了两种进程间沟通的工具Queue
和Pipe
Queue方式
Queue是基于文件传输的socket通信方式,并且它是带锁机制的。它的数据主要的特点是先进先出,后进后出。
当一个对象被放入一个队列中时,这个对象首先会被一个后台线程用pickle
序列化,并将序列化后的数据通过一个底层管道的管道传递给队列中。
主要使用如下方法:
- qsize(): 返回队列的大致的长度。返回的值由于多线程或多进程的上下文而变得不可靠
- empty(): 队列为空返回True,否则返回False。返回的值由于多线程或多进程的上下文而变得不可靠
- full(): 队列满了返回True,否则返回False。返回的值由于多线程或多进程的上下文而变得不可靠
- put(obj[, block[, timeout]]): 将obj放入队列。
- 如果block为True(默认值)而且timeout是None(默认值),将会阻塞当前进程,直到有空的缓冲槽。
- 如果timeout是正数,将会阻塞了最多timeout秒之后还是没有可用的缓冲槽时抛出
queue.Full
异常 - 反之block为False,仅当有可用缓冲槽时才放入对象,否则抛出
queue.Full
异常(这种情况下timeout参数会被忽略)
- get([block[, timeout]]): 从队列中取出并返回对象。如果可选参数block是True而且timeout是None,将会阻塞当前进程,直到队列中出现可用对象。如果timeout是正数,将会阻塞了最多timeout秒之后还是没有可用的对象时抛出
queue.Empty
异常。- 反之,block是False时,仅当有可用对象能够取出时返回,否则抛出
queue.Empty
异常(这种情况下timeout参数会被忽略)
- 反之,block是False时,仅当有可用对象能够取出时返回,否则抛出
import time, jsonfrom multiprocessing import Process, Queuedef task(i, queue: Queue):time.sleep(1)queue.put(i)print(f'task {i}, 入列')if __name__ == '__main__':queue = Queue()process_queue = []for i in range(5):p = Process(target=task, args=(i, queue))p.start()process_queue.append(p)for p in process_queue:p.join()for i in range(5):print(f'主进程中消费队列内容{queue.get()}')print('运行结束')
Pipe方式
Pipe方式是进程之间通信的另一种方式和Queue不同之处在于,它不带锁,且信息顺序无法得到保障。
主要的使用方法:
- send(obj): 将一个对象发送到链接的另一端,可以用
recv()
读取,发送的对象必须是可序列化的,多大的对象(接近32MiB)可能引发ValueError
异常 - recv(): 返回一个由另一端使用
send()
发送的对象,该方法会一直阻塞直到接收到对象。如果对端关闭了链接或者没有东西可接收,将抛出EOFError
异常
import timefrom multiprocessing import Process, Pipefrom multiprocessing.connection import Connectiondef task(pipe:Connection):print('子进程往管道里加了内容')time.sleep(1)pipe.send("子进程往管道中加了点东西")if __name__ == '__main__':pipe1, pipe2 = Pipe()p = Process(target=task, args=(pipe1,))p.start()p.join()print('主进程中获取管道内的内容为:', pipe2.recv())print('运行结束')