python并发编程3-进程

复习:

# 锁
# 多个进程在同一时间只有一个进程能进入代码去执行# 信号量 Semaphore
from multiprocessing import Semaphore
# 用锁的原理实现的。内置了一个计数器
#在同一时间 只能有指定数量的进程执行某一段被控制住的代码#事件
# wait阻塞收到事件状态控制的同步组件
# 状态 True False  利用is_set查看#true-->false clear()#false-->true  set()
#wait 状态为True不阻塞,状态为False的时候阻塞# 锁 多个进程在同一时间只有一个进程能进入代码去执行
# 信号量 在同一时间 只能有指定数量的进程执行某一段被控制住的代码
# 事件 进程要执行必须等通知
# 进程池 内有固定数目的进程。轮流利用进程#队列
# Queue
#       put() 当队列满的时候阻塞等待队列有空位置
#       get() 当队列空的时候阻塞等待队列有数据
#       full() empty()  不太准确
# JoinableQueue# get()  task_done# put()  join  等待数据取完并处理完

 管道

案例1:

def func(conn):conn.send('hello')
if __name__ =='__main__':conn1, conn2 = Pipe()Process(target=func,args=(conn1,)).start()print(conn2.recv())#运行结果:
# hello

案例2:

def func(conn1,conn2):conn2.close()  # 将不用的链接关闭while True:try:msg=conn1.recv()# if msg is None:breakprint(msg)except EOFError:conn1.close()break
if __name__ =='__main__':conn1, conn2 = Pipe()Process(target=func,args=(conn1,conn2)).start()  #参数必须将conn1,conn2都传过去conn1.close()for i in range(5):conn2.send('hello')# conn2.send(None)conn2.close()

运行结果:

管道实现生产者消费者模型:
 

from multiprocessing import Process,Pipe,Lock
import time
import random
def producer(con,pro,name,food):con.close()for i in range(5):time.sleep(random.random())f='%s生产%s第%i个'%(name,food,i)print(f)pro.send(f)pro.close()
def consumer(con,pro,name):pro.close()while True:try:food=con.recv()print('%s吃了%s'%(name,food))time.sleep(random.random())except EOFError:con.close()breakif __name__ =='__main__':con,pro=Pipe()p=Process(target=producer,args=(con,pro,'egon','饺子'))p.start()c=Process(target=consumer,args=(con,pro,'alex'))c.start()c1 = Process(target=consumer, args=(con, pro, 'Tony'))c1.start()con.close()pro.close()# Pipe 数据不安全性  多个消费者同时取一个数据
# 管道是进程数据不安全的  利用加锁
# IPC

运行结果:

管道实现生产者消费者模型-改进

from multiprocessing import Process,Pipe,Lock
import time
import random
def producer(con,pro,n):con.close()for i in range(n):pro.send(i)pro.send(None)pro.send(None)pro.close()def consumer(lock,con,pro,name):pro.close()while True:lock.acquire()food=con.recv()lock.release()if food:print('%s吃了%s'%(name,food))time.sleep(random.random())else:con.close()breakif __name__ =='__main__':con,pro=Pipe()lock=Lock()p=Process(target=producer,args=(con,pro,10))p.start()c=Process(target=consumer,args=(lock,con,pro,'alex'))c.start()c1 = Process(target=consumer, args=(lock,con, pro, 'Tony'))c1.start()con.close()pro.close()# Pipe 数据不安全性  多个消费者同时取一个数据
# 管道是进程数据不安全的  利用加锁
# IPC
# 队列 进程之间数据安全
# 队列=管道+锁

运行结果:

进程间数据共享

from multiprocessing import Manager,Process,Lock
def main(dict,lock):lock.acquire()dict['count']-=1lock.release()
if __name__ =='__main__':m=Manager()  # object 对象l=Lock()dict=m.dict({'count':100})p_list=[]for i in range(50):p=Process(target=main,args=(dict,l))p.start()p_list.append(p)for i in p_list:i.join()print('主进程:',dict)

运行结果:

进程池-map

# 为什么会有进程池的概念#效率# 每开启进程,开启属于这个进程的内存空间# 寄存器 堆栈 文件# 进程过多 操作系统的调度
# 进程池# python中的 先创建一个属于进程的池子# 这个池子指定能存放多少个进程# 先将这些进程创建好
# 更高级的进程池 python没有# n,m# n 只起n个进程# 加进程# 一直加到上限m个# 进程空闲时,再减少进程,一直减到n个。。。
from multiprocessing import Pool,Process
def func(n):for i in range(2):print(n+1)
def func2(n):# n[0]# n[1]for i in range(2):print(n[0])
# Process 超过5个进程,就用进程池。一般个数是cpu+1
import time
if __name__ =='__main__':start=time.time()pool=Pool(5)  #  5个进程pool.map(func,range(6))  #100个任务,自带join#pool.map(func2,range(100))  # 100个任务,自带joinpool.map(func2, [('alex',1),'egon'])  # 100个任务,自带joint1=time.time()-startstart=time.time()# 等价于:多进程p_list=[]for i in range(6):p=Process(target=func,args=(i,))p_list.append(p)p.start()for p in p_list:p.join()t2 = time.time() - startprint(t1,t2)

运行结果:

进程池-apply

from multiprocessing import Pool,Process
import time,os
def func(n):print('start func%s'%n,os.getpid())time.sleep(1)print('end func%s'%n,os.getpid())if __name__ =='__main__':p=Pool(5)# for i in range(10):#     p.apply(func,args=(i,))  # 同步for i in range(10):p.apply_async(func, args=(i,))  # 异步。主进程结束就结束了。没有等子进程结束p.close()  # 结束进程池接收任务p.join()  # 感知进程池中的任务执行结束。然而进程池中的进程永远是活着的

运行结果:

进程池的返回值

# p=Pool()
# p.map(funcname,iterable)  默认异步的执行任务,且自带close和join
# p.apply() 同步调用
# p.apply_async 异步调用,和主进程完全异步 需要手动close和join'''apply'''
# from multiprocessing import Pool
# def func(i):
#     return i*i
#
# if __name__ =='__main__':
#     p=Pool(5)  # 如果不写5,默认为cpu个数
#     for i in range(10):
#         res=p.apply(func,args=(i,))
#         print(res)'''apply_async异步'''
# from multiprocessing import Pool
# import time
# def func(i):
#     time.sleep(0.5)
#     return i*i
#
# if __name__ =='__main__':
#     p=Pool(5)  # 如果不写5,默认为cpu个数
#     for i in range(10):
#         res=p.apply_async(func,args=(i,))
#         print(res.get()) # 阻塞等待结果'''apply_async异步'''
# from multiprocessing import Pool
# import time
# def func(i):
#     time.sleep(0.5)
#     return i*i
#
# if __name__ =='__main__':
#     p=Pool(5)  # 如果不写5,默认为cpu个数
#     res_list=[]
#     for i in range(10):
#         res=p.apply_async(func,args=(i,))
#         res_list.append(res)
#         # print(res.get()) # 阻塞等待结果
#         print(123)
#     for res in res_list:print(res.get())#map异步
from multiprocessing import Pool
import time
def func(i):time.sleep(0.5)return i*iif __name__ =='__main__':p=Pool(5)  # 如果不写5,默认为cpu个数ret=p.map(func,range(10))print(ret)

运行结果:

进程池的回调函数

# 回调函数
import os
from multiprocessing import Pool
def func1(n):print('func1:',os.getpid())print('in func1')return n*ndef func2(nn):print('func2:',os.getpid())print('in func2')print(nn)if __name__ =='__main__':print('主进程:', os.getpid())p=Pool(5)p.apply_async(func1,args=(8,),callback=func2)p.close()p.join()

运行结果:

回调函数2:

# 回调函数 爬虫用到
# 爬虫 访问一个网址,将数据从网址上下载下来。数据是bytes转成字符串。处理字符串
from multiprocessing import Pool
def func1(n):return n+1
def func2(m):print(m)
if __name__ == '__main__':p=Pool(5)for i in range(10,20):p.apply_async(func1,args=(i,),callback=func2)p.close()p.join()

 运行结果:

回调函数爬虫例子

import requests
from urllib.request import urlopen
from multiprocessing import Pool
# response=requests.get('http://www.baidu.com')
# print(response)
# print(response.__dict__)
# print(response.status_code)
# print(response.content.decode('utf-8'))def get(url):response=requests.get(url)if response.status_code==200:return url,response.content.decode('utf-8')def get_urllib(url):ret=urlopen(url)return ret.read().decode('utf-8')def call_back(args):url,content=argsprint(url,len(content))if __name__ =='__main__':url_list=['http://www.cnblogs.com','http://www.baidu.com','http://www.sogou.com','http://www.suhu.com',]p=Pool(5)for url in url_list:p.apply_async(get,args=(url,),callback=call_back)p.close()p.join()

运行结果:

使用进程池qq聊天:

server端:
 

import socket
from multiprocessing import Pool
def func(conn):conn.send(b'hello')print(conn.recv(1024).decode('utf-8'))conn.close()if __name__ == '__main__':p=Pool(5)sk=socket.socket()sk.bind(('127.0.0.1',8080))sk.listen()while True:conn, addr = sk.accept()p.apply_async(func,args=(conn,))sk.close()

client端:

import socketsk=socket.socket()
sk.connect(('127.0.0.1',8080))ret=sk.recv(1024).decode('utf-8')
print(ret)
msg=input('>>>').encode('utf-8')
sk.send(msg)
sk.close()

运行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/568317.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

测试方法之正交试验

一、正交实验法  正交试验设计(Orthogonal experimental design)是研究多因素多水平的又一种设计方法,它是根据正交性从全面试验中挑选出部分有代表性的点进行试验,这些有代表性的点具备了“均匀分散,齐整可比”的特点,正交试验…

python并发编程4-线程

进程的出现 原来一台服务器只能执行一个任务。 进程的出现,可以让一台服务器处理多个任务。多个任务间进行切换,记录每个任务当前执行到哪里,记录有哪些数据。然后进行切换 每个进程区分开每个任务所能占有的内存空间 进程的缺点 线程的出现…

【Fiddler篇】FreeHttp无限篡改http报文数据调试和mock服务

目录 引言 FreeHttp起源 FreeHttp 插件安装FreeHttp 基本界面一:规则匹配区 1.1:『get http sesion in left session list』获取Session信息1.2:『select url filter method』Url匹配方式1.3:『edit advanced http filter』高级匹…

echarts实现双Y轴之散点和折线图

代码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"utf-8"><title></title><script src"echarts.js"></script> </head><body><div id"box" st…

Pytest装饰器@pytest.mark.parametrize一键生成接口正交试验用例

我们在做接口测试时&#xff0c;有时会遇到涉及用例特别多的时候&#xff0c;每个用例都去手动调一遍&#xff0c;很费时费力&#xff0c;也是不现实的&#xff0c;这篇文章我们就解决下这种费时费力的情况. 一、业务需求 某所大学通信系共2个班级&#xff0c;刚考完某一门课…

python列表对应元素合并为列表及判断一个列表是几维

一、合并对应元素 1、两个列表合并 a[1,2,3,4,5] b[2,3,4,5,6] d[] for i in range(len(a)):c []c.append(a[i])c.append(b[i])d.append(c) print(d) 运行结果&#xff1a; 2、一个列表垂直合并 3、一个列表顺序合并 date[] date_temp1[1545225954.721;1545225955.115, …

Pytest脚本中运行用例方式

脚本树如下&#xff1a; test1文件下test_01.py存放test1和test2用例 test1文件下test_02.py存放test1和test2用例 test2文件下test_03.py存放test1和test2用例 test2文件下test_04.py存放test1和test2用例 1、运行所有用例 import pytest if __name__ "__main__&quo…

js 获取当前元素的父元素的父元素的id

情景一&#xff1a;用onclick触发的函数 html代码&#xff1a; <div id"0" style"border-bottom:1px solid #000;margin:0 auto;"><div>1111</div><div class"original"><div id"chartx1" class"cha…

Python Unittest参数化parameterized之数据驱动

一、parameterized介绍 之前我们写过 Unittest第三方库parameterized类似Unittest的DDT、Pytestpytest.mark.parametrize&#xff0c;可以实现参数化用户数据驱动&#xff0c;避免写多个方法&#xff08;冗余&#xff09; 二、安装 pip install parameterized 三、导入 …

echarts图使用tab和下拉切换

方法一&#xff1a;用tab建切换 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title><script type"text/javascript" src"jquery-1.12.4.min.js"></scr…

Pytest fixture之request传参

Pytest中我们经常会用到数据参数化&#xff0c;我们来介绍下装饰器pytest.fixture()配合request传参的使用 user request.param 如果想把登录操作放到前置操作里&#xff0c;也就是用到pytest.fixture装饰器&#xff0c;传参就用默认的request参数 user request.param 这一…

使用securecrt在本地与服务器之间上传下载数据

第一种方式&#xff1a; 1、首先安装&#xff1a;apt install lrzsz lrzsz是一款在Linux里可代替ftp上传和下载的程序。 2、设置上传和下载目录&#xff1a;选项--》会话选项--》X/Y/Zmodem 中设置上传和下载目录 3、上传和下载 上传文件只需在shel中输入命令"rz"…

Pytest自定义标记mark及特定运行方式

mark 标记 标记执行指定类 pytest.main([-s,文件名,-m标记名]) pytest.main([-s,test01.py,-mtest]) import pytest pytest.mark.test class Test(object):def test_01(self):print(test_01)def test_02(self):print(test_02) if __name____main__:#运行指定的类pytest.main…

【Python】Error:'int' object is not callable

python错误&#xff1a; TypeError: int object is not callable 经过百度&#xff0c;可能是 你正在调用一个不能被调用的变量或对象&#xff0c;具体表现就是你调用函数、变量的方式错误。 如&#xff1a; max0 a[1.1, 2.0, 3.0, 4.0, 5.0] maxmax(a) 所以在命名变量时一定…

Pytest跳过执行之@pytest.mark.skip()详解大全

一、skip介绍及运用 在我们自动化测试过程中&#xff0c;经常会遇到功能阻塞、功能未实现、环境等一系列外部因素问题导致的一些用例执行不了&#xff0c;这时我们就可以用到跳过skip用例&#xff0c;如果我们注释掉或删除掉&#xff0c;后面还要进行恢复操作。 1、skip跳过成…

python并发编程5-线程

一、复习 # 线程# 线程是进程中的执行单位# 线程是CPU调度的最小单位# 线程之间资源共享## 线程的开启和关闭以及切换的时间开销远远小于进程# 线程本身可以在同一时间使用多个CPU # threading# 使用方法类似于multiprocess # python与线程# CPython解释器在解释代码中容易产生…

Pytest标记用例失败之xfail

项目自动化测试中&#xff0c;如果接口2依赖接口1的响应结果值&#xff0c;或者用例2依赖用例1的响应结果值&#xff0c;自然需要与接口1或用例1进行关联&#xff0c;但是当接口1或用例1执行失败&#xff0c;接口2或用例2一定也是失败的&#xff0c;所以这时不必要再进行接口2和…

python并发编程6-协程

1、基础 # 进程 启动多个进程 进程之间是由操作系统&#xff08;时间片轮转&#xff09;负责调用 # 线程 启动多个线程 真正被CPU执行的最小单位是线程# 开启一个线程 创建一个线程 寄存器 堆栈# 关闭一个线程 等都需要时间 # 协程&#xff1a;本质上是一个线程# 能够在多个任…

JMeter之HTTP请求上传文件/上传图片

Jmeter实现接口上传图片 一、Fiddler抓包上传图片接口 查看WebForms&#xff0c;接口传参为空&#xff0c;文件/图片传参为<file>对用的Name值&#xff1a; Content-Disposition: form-data; name"file"; filename"IMG_20191116_110507.jpg" Con…

js关闭setInterval及终止ajax请求

用clearInterval&#xff08;&#xff09;即可搞定。亲测有效 $(document).ready(function(){c setInterval(checkIsExist,10000);//每10秒执行一次checkIsExist方法 }); function checkIsExist(){$.ajax({type: "POST",url: "/SecondServlet",data: &qu…