python并发编程2-进程

一、信号量

# 多进程中的组件
# ktv
# 4个
# 一套资源 同一时间 只能被n个人访问
# 某一段代码 同一时间 只能被n个进程执行from multiprocessing import Process,Semaphore
import time
import random
def ktv(i,sem):sem.acquire()print('%s走进ktv' %i)time.sleep(random.randint(1,5))print('%s走出ktv' % i)sem.release()if __name__ =='__main__':sem=Semaphore(4)for i in range(10):p=Process(target=ktv,args=(i,sem))p.start()

运行结果:

二、事件

#事件
from multiprocessing import Event
# 一个信号乐意是所有的进程都进入阻塞状态
# 也可以控制所有的进程解除阻塞
#一个事件被创建之后,默认为阻塞状态
e=Event()
print(e.is_set()) #查看一个事件的状态。默认被设置成阻塞
e.set()  #将这个事件的状态改为True
print(e.is_set())
print(1234556)
e.wait() #是依据e.is_set()的值开决定是否阻塞的
print(123456)
e.clear()  #将这个时间的状态改为False
print(e.is_set())
e.wait()  # 等待事件的信号变为True
print('8'*10)# set 和 clear#分别用来修改一个事件的状态 True和False
# is_set 用来查看一个事件的状态
# wait 是依据事件的状态开决定自己是否在wait处阻塞
#   false 是阻塞 True 是不阻塞

运行结果:

事件案例之红绿灯:

from multiprocessing import Event,Process
import time
import random
def cars(e,i):if not e.is_set():print('car%i在等待'%i)e.wait()  #阻塞,知道得到一个时间状态改变成True的信号print('\033[34mcar%i在通过\033[0m' % i)def light(e):while True:if e.is_set():e.clear()print('\033[31m红灯亮了\033[0m')else:e.set()print('\033[32m绿灯亮了\033[0m')time.sleep(2)
if __name__ =='__main__':e=Event()traffic=Process(target=light,args=(e,))traffic.start()for i in range(10):car=Process(target=cars,args=(e,i))car.start()time.sleep(random.random())

运行结果:

三、队列

# 队列 先进先出
#import queue  做不到进程间通信
from multiprocessing import Queue
import time
q=Queue(5)  #表示队列的大小是5
q.put(1)
q.put(2)
q.put(3)
q.put(4)
q.put(5)
print(q.full())  #队列是否满了
# q.put(5)  # 阻塞,直到取出一个值,空出一个地
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.empty())
# print(q.get())  #阻塞,直到里面有数据了
while True:try:q.get_nowait()  #阻塞并且会报错except:print('队列为空')time.sleep(5)
# for i in range(6):
#     q.put(i)

运行结果:

队列之案例:

from multiprocessing import Queue,Process
def produce(q):q.put('hello')
def consume(q):print(q.get())if __name__=='__main__':q=Queue()p=Process(target=produce,args=(q,))p.start()c = Process(target=consume, args=(q,))c.start()

运行结果:

生产者消费者模型

# 队列
# 生产者消费者模型
#买包子
# 生产者 进程
# 消费者 进程
import time
import random
from multiprocessing import Queue,Processdef producer(name,food,q):for i in range(6):time.sleep(random.randint(1,3))f='%s生产了%s%s个'%(name,food,i)print(f)q.put(f)def consumer(q,name):while True:food=q.get()if food == None:print('%s获取到一个空了'%name)breakf = '\033[33m%s消费了了%s\033[0m' % (name, food)print(f)time.sleep(random.randint(1, 3))if __name__ =='__main__':q=Queue(20)p1=Process(target=producer,args=('Rgon','包子',q))p2 = Process(target=producer, args=('wusir', '泔水', q))p3 = Process(target=consumer, args=(q, 'alex'))p4 = Process(target=consumer, args=(q, 'jinsir'))p1.start()p2.start()p3.start()p4.start()p1.join()  # 让主进程感知生产者进程的结束p2.join()q.put(None)q.put(None)

运行结果:

可以看到利用手动添加p3.join()方式比较繁琐

生产者消费者模型改进(JoinableQueue)

import time
import random
from multiprocessing import Process,JoinableQueue
def consumer(q,name):while True:food=q.get()f = '\033[33m%s消费了了%s\033[0m' % (name, food)print(f)time.sleep(random.randint(1, 3))q.task_done()  # count -1:20 19 18
def producer(name,food,q):for i in range(6):time.sleep(random.randint(1,3))f='%s生产了%s%s个'%(name,food,i)print(f)q.put(f)  #count +1 : 0 1 2 3q.join()  # 阻塞,直到感知一个队列中的数据全部被执行完毕if __name__ =='__main__':q=JoinableQueue(20)p1=Process(target=producer,args=('Rgon','包子',q))p2 = Process(target=producer, args=('wusir', '泔水', q))c1 = Process(target=consumer, args=(q, 'alex'))c2 = Process(target=consumer, args=(q, 'jinsir'))p1.start()p2.start()c1.daemon=True  # 设置为守护进程,主进程中的代码执行完毕之后,子进程自动结束c2.daemon=Truec1.start()c2.start()p1.join()  # 让主进程感知生产者进程的结束p2.join()# 在消费者这端:# 每次获取一个数据# 处理一个数据# 发送一个记号:标志一个数据被处理成功task_done
# 在生产者这端:# 每一次生产一个数据# 且每次生产的数据都放在队列中# 在队列中刻上一个记号# 当生产者全部生产完毕之后,# join信号:已经停止生产数据了# 且要等待之前被刻上的记号都没消费完# 当数据都被处理完毕后,join阻塞结束# consumer中把所有的任务消耗完
# prodecer端的join感知到,停止阻塞
# 所有的producer进程结束
# 主进程中的p.join()结束
# 主进程中代码结束
# 守护进程(消费者进程)结束

运行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/568319.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python Pytest装饰器@pytest.mark.parametrize用例数据驱动(三)

一、测试用例用excel管理,存放地址:C:\Users\wangli\PycharmProjects\Test\test\files\apiCase.xls 二、代码实现如下: 1、封装读取excel用例数据 2、Pytest装饰器pytest.mark.parametrize(参数名,list)实现登录模块2条测试用例数据驱动 im…

python并发编程3-进程

复习: # 锁 # 多个进程在同一时间只有一个进程能进入代码去执行# 信号量 Semaphore from multiprocessing import Semaphore # 用锁的原理实现的。内置了一个计数器 #在同一时间 只能有指定数量的进程执行某一段被控制住的代码#事件 # wait阻塞收到事件状态控制的同…

测试方法之正交试验

一、正交实验法  正交试验设计(Orthogonal experimental design)是研究多因素多水平的又一种设计方法,它是根据正交性从全面试验中挑选出部分有代表性的点进行试验,这些有代表性的点具备了“均匀分散,齐整可比”的特点,正交试验…

python并发编程4-线程

进程的出现 原来一台服务器只能执行一个任务。 进程的出现,可以让一台服务器处理多个任务。多个任务间进行切换,记录每个任务当前执行到哪里,记录有哪些数据。然后进行切换 每个进程区分开每个任务所能占有的内存空间 进程的缺点 线程的出现…

【Fiddler篇】FreeHttp无限篡改http报文数据调试和mock服务

目录 引言 FreeHttp起源 FreeHttp 插件安装FreeHttp 基本界面一:规则匹配区 1.1:『get http sesion in left session list』获取Session信息1.2:『select url filter method』Url匹配方式1.3:『edit advanced http filter』高级匹…

echarts实现双Y轴之散点和折线图

代码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"utf-8"><title></title><script src"echarts.js"></script> </head><body><div id"box" st…

Pytest装饰器@pytest.mark.parametrize一键生成接口正交试验用例

我们在做接口测试时&#xff0c;有时会遇到涉及用例特别多的时候&#xff0c;每个用例都去手动调一遍&#xff0c;很费时费力&#xff0c;也是不现实的&#xff0c;这篇文章我们就解决下这种费时费力的情况. 一、业务需求 某所大学通信系共2个班级&#xff0c;刚考完某一门课…

python列表对应元素合并为列表及判断一个列表是几维

一、合并对应元素 1、两个列表合并 a[1,2,3,4,5] b[2,3,4,5,6] d[] for i in range(len(a)):c []c.append(a[i])c.append(b[i])d.append(c) print(d) 运行结果&#xff1a; 2、一个列表垂直合并 3、一个列表顺序合并 date[] date_temp1[1545225954.721;1545225955.115, …

Pytest脚本中运行用例方式

脚本树如下&#xff1a; test1文件下test_01.py存放test1和test2用例 test1文件下test_02.py存放test1和test2用例 test2文件下test_03.py存放test1和test2用例 test2文件下test_04.py存放test1和test2用例 1、运行所有用例 import pytest if __name__ "__main__&quo…

js 获取当前元素的父元素的父元素的id

情景一&#xff1a;用onclick触发的函数 html代码&#xff1a; <div id"0" style"border-bottom:1px solid #000;margin:0 auto;"><div>1111</div><div class"original"><div id"chartx1" class"cha…

Python Unittest参数化parameterized之数据驱动

一、parameterized介绍 之前我们写过 Unittest第三方库parameterized类似Unittest的DDT、Pytestpytest.mark.parametrize&#xff0c;可以实现参数化用户数据驱动&#xff0c;避免写多个方法&#xff08;冗余&#xff09; 二、安装 pip install parameterized 三、导入 …

echarts图使用tab和下拉切换

方法一&#xff1a;用tab建切换 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title><script type"text/javascript" src"jquery-1.12.4.min.js"></scr…

Pytest fixture之request传参

Pytest中我们经常会用到数据参数化&#xff0c;我们来介绍下装饰器pytest.fixture()配合request传参的使用 user request.param 如果想把登录操作放到前置操作里&#xff0c;也就是用到pytest.fixture装饰器&#xff0c;传参就用默认的request参数 user request.param 这一…

使用securecrt在本地与服务器之间上传下载数据

第一种方式&#xff1a; 1、首先安装&#xff1a;apt install lrzsz lrzsz是一款在Linux里可代替ftp上传和下载的程序。 2、设置上传和下载目录&#xff1a;选项--》会话选项--》X/Y/Zmodem 中设置上传和下载目录 3、上传和下载 上传文件只需在shel中输入命令"rz"…

Pytest自定义标记mark及特定运行方式

mark 标记 标记执行指定类 pytest.main([-s,文件名,-m标记名]) pytest.main([-s,test01.py,-mtest]) import pytest pytest.mark.test class Test(object):def test_01(self):print(test_01)def test_02(self):print(test_02) if __name____main__:#运行指定的类pytest.main…

【Python】Error:'int' object is not callable

python错误&#xff1a; TypeError: int object is not callable 经过百度&#xff0c;可能是 你正在调用一个不能被调用的变量或对象&#xff0c;具体表现就是你调用函数、变量的方式错误。 如&#xff1a; max0 a[1.1, 2.0, 3.0, 4.0, 5.0] maxmax(a) 所以在命名变量时一定…

Pytest跳过执行之@pytest.mark.skip()详解大全

一、skip介绍及运用 在我们自动化测试过程中&#xff0c;经常会遇到功能阻塞、功能未实现、环境等一系列外部因素问题导致的一些用例执行不了&#xff0c;这时我们就可以用到跳过skip用例&#xff0c;如果我们注释掉或删除掉&#xff0c;后面还要进行恢复操作。 1、skip跳过成…

python并发编程5-线程

一、复习 # 线程# 线程是进程中的执行单位# 线程是CPU调度的最小单位# 线程之间资源共享## 线程的开启和关闭以及切换的时间开销远远小于进程# 线程本身可以在同一时间使用多个CPU # threading# 使用方法类似于multiprocess # python与线程# CPython解释器在解释代码中容易产生…

Pytest标记用例失败之xfail

项目自动化测试中&#xff0c;如果接口2依赖接口1的响应结果值&#xff0c;或者用例2依赖用例1的响应结果值&#xff0c;自然需要与接口1或用例1进行关联&#xff0c;但是当接口1或用例1执行失败&#xff0c;接口2或用例2一定也是失败的&#xff0c;所以这时不必要再进行接口2和…

python并发编程6-协程

1、基础 # 进程 启动多个进程 进程之间是由操作系统&#xff08;时间片轮转&#xff09;负责调用 # 线程 启动多个线程 真正被CPU执行的最小单位是线程# 开启一个线程 创建一个线程 寄存器 堆栈# 关闭一个线程 等都需要时间 # 协程&#xff1a;本质上是一个线程# 能够在多个任…