一、信号量
# 多进程中的组件
# ktv
# 4个
# 一套资源 同一时间 只能被n个人访问
# 某一段代码 同一时间 只能被n个进程执行from multiprocessing import Process,Semaphore
import time
import random
def ktv(i,sem):sem.acquire()print('%s走进ktv' %i)time.sleep(random.randint(1,5))print('%s走出ktv' % i)sem.release()if __name__ =='__main__':sem=Semaphore(4)for i in range(10):p=Process(target=ktv,args=(i,sem))p.start()
运行结果:
二、事件
#事件
from multiprocessing import Event
# 一个信号乐意是所有的进程都进入阻塞状态
# 也可以控制所有的进程解除阻塞
#一个事件被创建之后,默认为阻塞状态
e=Event()
print(e.is_set()) #查看一个事件的状态。默认被设置成阻塞
e.set() #将这个事件的状态改为True
print(e.is_set())
print(1234556)
e.wait() #是依据e.is_set()的值开决定是否阻塞的
print(123456)
e.clear() #将这个时间的状态改为False
print(e.is_set())
e.wait() # 等待事件的信号变为True
print('8'*10)# set 和 clear#分别用来修改一个事件的状态 True和False
# is_set 用来查看一个事件的状态
# wait 是依据事件的状态开决定自己是否在wait处阻塞
# false 是阻塞 True 是不阻塞
运行结果:
事件案例之红绿灯:
from multiprocessing import Event,Process
import time
import random
def cars(e,i):if not e.is_set():print('car%i在等待'%i)e.wait() #阻塞,知道得到一个时间状态改变成True的信号print('\033[34mcar%i在通过\033[0m' % i)def light(e):while True:if e.is_set():e.clear()print('\033[31m红灯亮了\033[0m')else:e.set()print('\033[32m绿灯亮了\033[0m')time.sleep(2)
if __name__ =='__main__':e=Event()traffic=Process(target=light,args=(e,))traffic.start()for i in range(10):car=Process(target=cars,args=(e,i))car.start()time.sleep(random.random())
运行结果:
三、队列
# 队列 先进先出
#import queue 做不到进程间通信
from multiprocessing import Queue
import time
q=Queue(5) #表示队列的大小是5
q.put(1)
q.put(2)
q.put(3)
q.put(4)
q.put(5)
print(q.full()) #队列是否满了
# q.put(5) # 阻塞,直到取出一个值,空出一个地
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.empty())
# print(q.get()) #阻塞,直到里面有数据了
while True:try:q.get_nowait() #阻塞并且会报错except:print('队列为空')time.sleep(5)
# for i in range(6):
# q.put(i)
运行结果:
队列之案例:
from multiprocessing import Queue,Process
def produce(q):q.put('hello')
def consume(q):print(q.get())if __name__=='__main__':q=Queue()p=Process(target=produce,args=(q,))p.start()c = Process(target=consume, args=(q,))c.start()
运行结果:
生产者消费者模型
# 队列
# 生产者消费者模型
#买包子
# 生产者 进程
# 消费者 进程
import time
import random
from multiprocessing import Queue,Processdef producer(name,food,q):for i in range(6):time.sleep(random.randint(1,3))f='%s生产了%s%s个'%(name,food,i)print(f)q.put(f)def consumer(q,name):while True:food=q.get()if food == None:print('%s获取到一个空了'%name)breakf = '\033[33m%s消费了了%s\033[0m' % (name, food)print(f)time.sleep(random.randint(1, 3))if __name__ =='__main__':q=Queue(20)p1=Process(target=producer,args=('Rgon','包子',q))p2 = Process(target=producer, args=('wusir', '泔水', q))p3 = Process(target=consumer, args=(q, 'alex'))p4 = Process(target=consumer, args=(q, 'jinsir'))p1.start()p2.start()p3.start()p4.start()p1.join() # 让主进程感知生产者进程的结束p2.join()q.put(None)q.put(None)
运行结果:
可以看到利用手动添加p3.join()方式比较繁琐
生产者消费者模型改进(JoinableQueue)
import time
import random
from multiprocessing import Process,JoinableQueue
def consumer(q,name):while True:food=q.get()f = '\033[33m%s消费了了%s\033[0m' % (name, food)print(f)time.sleep(random.randint(1, 3))q.task_done() # count -1:20 19 18
def producer(name,food,q):for i in range(6):time.sleep(random.randint(1,3))f='%s生产了%s%s个'%(name,food,i)print(f)q.put(f) #count +1 : 0 1 2 3q.join() # 阻塞,直到感知一个队列中的数据全部被执行完毕if __name__ =='__main__':q=JoinableQueue(20)p1=Process(target=producer,args=('Rgon','包子',q))p2 = Process(target=producer, args=('wusir', '泔水', q))c1 = Process(target=consumer, args=(q, 'alex'))c2 = Process(target=consumer, args=(q, 'jinsir'))p1.start()p2.start()c1.daemon=True # 设置为守护进程,主进程中的代码执行完毕之后,子进程自动结束c2.daemon=Truec1.start()c2.start()p1.join() # 让主进程感知生产者进程的结束p2.join()# 在消费者这端:# 每次获取一个数据# 处理一个数据# 发送一个记号:标志一个数据被处理成功task_done
# 在生产者这端:# 每一次生产一个数据# 且每次生产的数据都放在队列中# 在队列中刻上一个记号# 当生产者全部生产完毕之后,# join信号:已经停止生产数据了# 且要等待之前被刻上的记号都没消费完# 当数据都被处理完毕后,join阻塞结束# consumer中把所有的任务消耗完
# prodecer端的join感知到,停止阻塞
# 所有的producer进程结束
# 主进程中的p.join()结束
# 主进程中代码结束
# 守护进程(消费者进程)结束
运行结果: