多进程补充
僵尸进程和孤儿进程
基于unix环境(linux,macOS)
主进程需要等待子进程结束之后,主进程才结束
主进程时刻监测子进程的运行状态,当子进程结束之后,一段时间之内,将子进程进行回收.
为什么主进程不在子进程结束后马上对其回收呢?
- 主进程与子进程是异步关系.主进程无法马上捕获子进程什么时候结束.
- 如果子进程结束之后马上再内存中释放资源,主进程就没有办法监测子进程的状态了.
unix针对于上面的问题,提供了一个机制.
所有的子进程结束之后,立马会释放掉文件的操作链接,内存的大部分数据,但是会保留一些内容: 进程号,结束时间,运行状态,等待主进程监测,回收.
僵尸进程: 所有的子进程结束之后,在被主进程回收之前,都会进入僵尸进程状态.
僵尸进程有无危害???
如果父进程不对僵尸进程进行回收(wait/waitpid),产生大量的僵尸进程,这样就会占用内存,占用进程pid号.
孤儿进程:
父进程由于某种原因结束了,但是你的子进程还在运行中,这样你的这些子进程就成了孤儿进程.你的父进程如果结束了,你的所有的孤儿进程就会被init进程的回收,init就变成了你的父进程,对你进行回收.
僵尸进程如何解决???
父进程产生了大量子进程,但是不回收,这样就会形成大量的僵尸进程,解决方式就是直接杀死父进程,将所有的僵尸进程变成孤儿进程进程,由init进行回收.
互斥锁
互斥锁就是在保证子进程串行的同时,也保证了子进程执行顺序的随机性,以及数据的安全性
# 三个同事 同时用一个打印机打印内容.
# 三个进程模拟三个同事, 输出平台模拟打印机.
版本一:
from multiprocessing import Process
import time
import random
import osdef task1():print(f'{os.getpid()}开始打印了')time.sleep(random.randint(1,3))print(f'{os.getpid()}打印结束了')def task2():print(f'{os.getpid()}开始打印了')time.sleep(random.randint(1,3))print(f'{os.getpid()}打印结束了')def task3():print(f'{os.getpid()}开始打印了')time.sleep(random.randint(1,3))print(f'{os.getpid()}打印结束了')if __name__ == '__main__':p1 = Process(target=task1)p2 = Process(target=task2)p3 = Process(target=task3)p1.start()p2.start()p3.start()
# 现在是所有的进程都并发的抢占打印机,
# 并发是以效率优先的,但是目前我们的需求: 顺序优先.
# 多个进程共强一个资源时, 要保证顺序优先: 串行,一个一个来.
版本二:from multiprocessing import Process
import time
import random
import osdef task1(p):print(f'{p}开始打印了')time.sleep(random.randint(1,3))print(f'{p}打印结束了')def task2(p):print(f'{p}开始打印了')time.sleep(random.randint(1,3))print(f'{p}打印结束了')def task3(p):print(f'{p}开始打印了')time.sleep(random.randint(1,3))print(f'{p}打印结束了')if __name__ == '__main__':p1 = Process(target=task1,args=('p1',))p2 = Process(target=task2,args=('p2',))p3 = Process(target=task3,args=('p3',))p2.start()p2.join()p1.start()p1.join()p3.start()p3.join()
# 我们利用join 解决串行的问题,保证了顺序优先,但是这个谁先谁后是固定的.
# 这样不合理. 你在争抢同一个资源的时候,应该是先到先得,保证公平.
版本3:from multiprocessing import Process
from multiprocessing import Lock
import time
import random
import osdef task1(p,lock):'''一把锁不能连续锁两次lock.acquire()lock.acquire()lock.release()lock.release()'''lock.acquire()print(f'{p}开始打印了')time.sleep(random.randint(1,3))print(f'{p}打印结束了')lock.release()def task2(p,lock):lock.acquire()print(f'{p}开始打印了')time.sleep(random.randint(1,3))print(f'{p}打印结束了')lock.release()def task3(p,lock):lock.acquire()print(f'{p}开始打印了')time.sleep(random.randint(1,3))print(f'{p}打印结束了')lock.release()if __name__ == '__main__':mutex = Lock()p1 = Process(target=task1,args=('p1',mutex))p2 = Process(target=task2,args=('p2',mutex))p3 = Process(target=task3,args=('p3',mutex))p2.start()p1.start()p3.start()
版本四:
from multiprocessing import Process
from multiprocessing import Lock
import time
import random
import sys
import os
def task(name,Lock):Lock.acquire()print(f"{name} is running")time.sleep(random.randint(1,4))print(f"{name} is gone")Lock.release()
if __name__ == '__main__':mutex = Lock()for i in range(3):p = Process(target=getattr(sys.modules[__name__],'task'),args=(f"p{i}",mutex))p.start()
lock与join的区别.
共同点: 都可以把并发变成串行, 保证了顺序.
不同点: join人为设定顺序,lock让其争抢顺序,保证了公平性.
# 当很多进程共强一个资源(数据)时, 你要保证顺序(数据的安全),一定要串行.
# 互斥锁: 可以公平性的保证顺序以及数据的安全.# 基于文件的进程之间的通信:# 效率低.# 自己加锁麻烦而且很容易出现死锁.
进程之间的通信
进程在内存级别是隔离的,但是文件在磁盘上,
基于文件通信
from multiprocessing import Process
import json
import time
import os
import randomdef search():time.sleep(random.randint(1,3)) # 模拟网络延迟(查询环节)with open('ticket.json',encoding='utf-8') as f1:dic = json.load(f1)print(f'{os.getpid()} 查看了票数,剩余{dic["count"]}')def paid():with open('ticket.json', encoding='utf-8') as f1:dic = json.load(f1)if dic['count'] > 0:dic['count'] -= 1time.sleep(random.randint(1,3)) # 模拟网络延迟(购买环节)with open('ticket.json', encoding='utf-8',mode='w') as f1:json.dump(dic,f1)print(f'{os.getpid()} 购买成功')def task():search()paid()if __name__ == '__main__':for i in range(6):p = Process(target=task)p.start()
from multiprocessing import Process
from multiprocessing import Lock
import json
import time
import os
import randomdef search():time.sleep(random.randint(1,3)) # 模拟网络延迟(查询环节)with open('ticket.json',encoding='utf-8') as f1:dic = json.load(f1)print(f'{os.getpid()} 查看了票数,剩余{dic["count"]}')def paid():with open('ticket.json', encoding='utf-8') as f1:dic = json.load(f1)if dic['count'] > 0:dic['count'] -= 1time.sleep(random.randint(1,3)) # 模拟网络延迟(购买环节)with open('ticket.json', encoding='utf-8',mode='w') as f1:json.dump(dic,f1)print(f'{os.getpid()} 购买成功')def task(lock):search()lock.acquire()paid()lock.release()if __name__ == '__main__':mutex = Lock()for i in range(6):p = Process(target=task,args=(mutex,))p.start()
基于队列通信
from multiprocessing import Process
from multiprocessing import Queue
import random
import time
import os
def check(q):time.sleep(random.randint(1,3))num = q.qsize()print(f"{os.getpid()}查票,剩余{num}")
def paid(q):time.sleep(random.randint(1,3))try:q.get(block = False)if q.qsize()>=0:print(f"{os.getpid()}购买成功,剩余{q.qsize()}")else:print('没票了')except Exception:print('没票了')
def task(q):check(q)paid(q)
if __name__ == '__main__':q = Queue()for i in range(3):q.put(1)for i in range(10):p = Process(target=task,args=(q,))p.start()