Python运维之多进程!!

本节的快速导航目录如下喔!!!

一、创建进程的类Process

二、进程并发控制之Semaphore

三、进程同步之Lock

四、进程同步之Event

五、进程优先队列Queue

六、多进程之进程池Pool

七、多进程之数据交换Pipe


一、创建进程的类Process

multiprocessing模块提供了一个创建进程的类Process,创建进程有两种方法:

  • 创建一个类的实例Process,并指定任务目标函数
  • 自定义一个类并继承Process类,重写init()方法和run方法

第一种方法:

 import osimport timefrom multiprocessing import Process​# 字进程要执行的代码:数据累加耗时函数def task_process(delay):num = 0for i in range(delay * 10000000):num += iprint(f"进程pid为{os.getpid()},执行完成")​​if __name__ == '__main__':print('父进程pid为 %s.' % os.getpid())t0 = time.time()# 单进程执行task_process(3)task_process(3)t1 = time.time()print(f"顺序执行耗时{t1 - t0}")p0 = Process(target=task_process, args=(3,))p1 = Process(target=task_process, args=(4,))t2 = time.time()# 双进程并行执行p0.start();p1.start()p0.join();p1.join()t3 = time.time()print(f"多进程并发执行耗时{t3 - t2}")# 发现多进程执行相同的操作次数耗时更少

第二种方法(重写方法):

 # -*- coding: UTF-8 -*-import osimport timefrom multiprocessing import Process​class MyProcess(Process):def __init__(self, delay):super().__init__()self.delay = delay​# 子进程要执行代码def run(self):num = 0for i in range(self.delay * 10000000):num += iprint(f"进程pid为{os.getpid()},执行完成")​if __name__ == '__main__':print('父进程pid为 %s.' % os.getpid())p0 = MyProcess(3)p1 = MyProcess(3)t0 = time.time()p0.start()p1.start()p0.join()p1.join()t1 = time.time()print(f"多进程并发执行耗时{t1 - t0}")

Process类的构造函数原型:

 class multiprocessing.Process(group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)​'''参数说明如下:Target:表示调用对象,一般为函数,也可以为类。Args:表示调用对象的位置参数元组Kwargs:为进程调用对象的字典Name:为进程的别名Group:参数不使用,可忽略'''

类提供的常用方法如下:

  • is_alive():返回进程是否是激活的
  • join([timeout])阻塞进程,直到进程执行完成或超时或进程被终止
  • run():代表进程执行的任务函数,可被重写
  • start()激活进程
  • terminate():终止进程

其属性如下:

  • authkey:字节码,进程的准密钥
  • daemon父进程终止后自动终止且不能产生新进程,必须在start()之前设置
  • exicode:退出码,进程在运行时为None,如果为-N,就表示被信号N结束
  • name:获取进程名称
  • pid:进程ID

例子:不设置daemon属性:

 # -*- coding: UTF-8 -*-import osimport timefrom multiprocessing import Process​# 子进程要执行的代码def task_process(delay):print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 子进程执行开始")print(f"sleep {delay}s")time.sleep(delay)print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 子进程执行结束")​if __name__ == '__main__':print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 父进程执行开始")p0 = Process(target=task_process,args=(3,))p0.start()print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 父进程执行结束")​'''2024-05-08 20:04:09 父进程执行开始2024-05-08 20:04:09 父进程执行结束2024-05-08 20:04:10 子进程执行开始sleep 3s2024-05-08 20:04:13 子进程执行结束'''
'''
没有使用p0.join()来阻塞进程,父进程并没有等待子进程运行完成就打印了退出信息,程序依旧会等待子进程运行完成
'''

例子:设置daemon属性:

 # 在上述代码主程序中添加p0.deamon = True​'''2024-05-08 20:07:11 父进程执行开始2024-05-08 20:07:11 父进程执行结束'''# 程序并没有等待子进程结束而结束,主要主程序结束,程序即退出

二、进程并发控制之Semaphore

semaphore用来控制对共享资源的访问数量,可以控制同一时刻并发的进程数

 # 实例:多进程同步控制import multiprocessingimport time​def worker(s,i):s.acquire()                         print(time.strftime('%H:%M:%S'),multiprocessing.current_process().name+"获得锁运行");time.sleep(i)print(time.strftime('%H:%M:%S'), multiprocessing.current_process().name +"释放锁运行");s.release()​if __name__ == '__main__':s = multiprocessing.Semaphore(2)    # 同一时刻只有两个进程在执行for i in range(6):p = multiprocessing.Process(target=worker(s,2))p.start()

三、进程同步之Lock

多进程的目的是并发执行,提高资源利用率,从而提高效率,但有时候我们需要在某一时间只能有一个进程访问某个共享资源,这种情况就需要使用锁Lock

 # 多个进程输出信息,不加锁:import multiprocessingimport time​def task1():n = 5while n > 1:print(f"{time.strftime('%H:%M:%S')}task1 输出信息")time.sleep(1)n -= 1​def task2():n = 5while n > 1:print(f"{time.strftime('%H:%M:%S')}task2 输出信息")time.sleep(1)n -= 1​def task3():n = 5while n > 1:print(f"{time.strftime('%H:%M:%S')}task3 输出信息")time.sleep(1)n -= 1​if __name__ == '__main__':p1 = multiprocessing.Process(target=task1)p2 = multiprocessing.Process(target=task2)p3 = multiprocessing.Process(target=task3)p1.start();p2.start();p3.start()

未使用锁,生成三个子进程,每个进程都打印自己的信息。下面使用锁:

 import multiprocessingimport time​def task1(lock):with lock:      # 也可使用上下文关键字加锁n = 5while n > 1:print(f"{time.strftime('%H:%M:%S')}task1 输出信息")time.sleep(1)n -= 1​def task2(lock):lock.acquire()n = 5while n > 1:print(f"{time.strftime('%H:%M:%S')}task2 输出信息")time.sleep(1)n -= 1lock.release()​def task3(lock):lock.acquire()  # 调用前后加锁n = 5while n > 1:print(f"{time.strftime('%H:%M:%S')}task3 输出信息")time.sleep(1)n -= 1lock.release()​if __name__ == '__main__':lock = multiprocessing.Lock()       # 1、实例化一个锁p1 = multiprocessing.Process(target=task1,args=(lock,))p2 = multiprocessing.Process(target=task2,args=(lock,))p3 = multiprocessing.Process(target=task3,args=(lock,))p1.start()p2.start()p3.start()

可以发现,同一时刻只有一个进程在输出信息。

四、进程同步之Event

Event用来实现进程之间同步通信

import multiprocessing
import timedef with_for_event(e):e.wait()time.sleep(1)# 唤醒后清除Event状态,为后续继续等待e.clear()print(f"{time.strftime('%H:%M:%S')}:进程A:我们是兄弟,我等你...")e.wait()print(f"{time.strftime('%H:%M:%S')}:进程A:好的,是兄弟一起走")def wait_for_event_timeout(e, t):e.wait()time.sleep(1)e.clear()print(f"{time.strftime('%H:%M:%S')}:进程B:好吧,最多等你{t}秒")e.wait(t)print(f"{time.strftime('%H:%M:%S')}:进程B:我要继续往前走了")if __name__ == '__main__':e = multiprocessing.Event()w1 = multiprocessing.Process(target=with_for_event,args=(e,))w2 = multiprocessing.Process(target=wait_for_event_timeout,args=(e,5))w1.start()w2.start()# 主进程发话print(f"{time.strftime('%H:%M:%S')}:主进程:谁等我下,我需要8秒时间")# 唤醒等待的进程e.set()time.sleep(8)print(f"{time.strftime('%H:%M:%S')}:好的,我赶上了")# 再次唤醒等待的进程e.set()w1.join()w2.join()print(f"{time.strftime('%H:%M:%S')}:主进程:退出")

上述定义了两个函数,一个用于等待事件的发生;一个用于等待事件发生并设置超时时间。主进程调用事件的set()方法唤醒等待事件的进程,事件唤醒后调用claer()方法清除事件的状态并重新等待,以此达到进程同步的控制。

五、进程优先队列Queue

  • 多进程安全:Queue是为多进程环境设计的,确保在多进程之间进行数据传递时的安全性。
  • put方法
    • 用于向队列中插入数据
    • 有两个可选参数:blocked timeout
    • 默认情况下,blocked 设置为 True
    • 如果 timeout 是一个正值,并且队列已满,put 方法会阻塞直到队列有空间或者超时
    • 超时会抛出 Queue.Full 异常。
    • 如果 blocked 设置为 False 且队列已满,会立即抛出 Queue.Full 异常。
  • get方法
    • 用于从队列中读取并删除一个元素
    • 同样有两个可选参数:blockedtimeout
    • 默认情况下,blocked 设置为 True
    • 如果 timeout 是一个正值,并且在指定时间内队列中没有元素可取,会抛出 Queue.Empty 异常。
    • 如果block设置为False则有两种情况:
      • 如果队列为空,会立即抛出 Queue.Empty 异常。
      • 如果队列中有值可用,会立即返回该值。

这些特性确保了在多进程环境中,队列操作的同步性和数据的一致性。

# 实现消费者-生产者模式
from multiprocessing import Process,Queue
import timedef ProducerA(q):count = 1while True:q.put(f"冷饮 {count}")print(f"{time.strftime('%H:%M:%S')} A 放入:[冷饮 {count}]")count +=1time.sleep(1)def ConsumerB(q):while True:print(f"{time.strftime('%H:%M:%S')} B 放入:[取出 {q.get()}]")time.sleep(5)if __name__ == '__main__':q = Queue(maxsize=5)p = Process(target=ProducerA,args=(q,))c = Process(target=ConsumerB,args=(q,))c.start()p.start()c.join()p.join()

上述代码定义了生产者函数和消费者函数,设置其队列的最大容量是5,生产者不停的生产冷饮,消费者就不停的取出冷饮消费,当队列满时,生产者等待,当队列空时,消费者等待。

六、多进程之进程池Pool

进程池(Pool)按需创建进程,池满则请求排队等待

import multiprocessing
import timedef task(name):print(f"{time.strftime('%H:%M:%S')}: {name} 开始执行")time.sleep(3)if __name__ == '__main__':pool = multiprocessing.Pool(processes=3)for i in range(10):# 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去pool.apply_async(func=task, args=(i,))pool.close()pool.join()

从结果来看,同一时刻只有三个进程在执行,使用Pool实现了对进程并发数的控制

七、多进程之数据交换Pipe

在Python的多进程编程中,multiprocessing.Pipe() 方法可以创建一个管道,用于进程间的数据传输。

默认情况下,管道是全双工的,即两个进程可以互相发送和接收数据。如果需要设置为半双工(单向传输),可以通过传递参数duplex=False 来实现。

管道对象提供了send() 方法用于发送消息,以及recv() 方法用于接收消息。如果recv()在没有消息的情况下被调用,它会阻塞;如果管道已关闭,recv()会抛出EOFError异常。

import multiprocessing
import timedef task1(pipe):for i in range(5):str = f"task1-{i}"print(f"{time.strftime('%H:%M:%S')}: task1 发送:{str}")pipe.send(str)time.sleep(2)for i in range(5):print(f"{time.strftime('%H:%M:%S')}: task1 接收:{ pipe.recv() }")def task2(pipe):for i in range(5):print(f"{time.strftime('%H:%M:%S')}: task2 接收:{pipe.recv()}")time.sleep(2)for i in range(5):str = f"task1-{i}"print(f"{time.strftime('%H:%M:%S')}: task2 发送:{ str }")if __name__ == '__main__':pipe = multiprocessing.Pipe()p1 = multiprocessing.Process(target=task1,args=(pipe[0],))p2 = multiprocessing.Process(target=task2,args=(pipe[1],))p1.start()p2.start()p1.join()p2.join()

首先程序定义了两个子进程函数:task1先发送5条消息,再接收消息;task2先接收消息,再发送消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/9198.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【 书生·浦语大模型实战营】学习笔记(六):Lagent AgentLego 智能体应用搭建

🎉AI学习星球推荐: GoAI的学习社区 知识星球是一个致力于提供《机器学习 | 深度学习 | CV | NLP | 大模型 | 多模态 | AIGC 》各个最新AI方向综述、论文等成体系的学习资料,配有全面而有深度的专栏内容,包括不限于 前沿论文解读、…

暗区突围国际服pc端海外版如何快速致富 暗区突围pc端怎么赚钱

暗区突围是一款由腾讯魔方工作室研发的高拟真硬核射击手游,以现代战争为游戏题材,采用了全新的u3d引擎打造,整体游戏画风逼真写实,搭配上优秀的射击玩法,辅以史诗级的背景配乐,致力于带给玩家无与伦比的枪战…

【因特网中自治系统内部的路由选择,RIP 进程处理 OSPFOSPF(Open Shortest Path First)最短路径优先协议】

文章目录 因特网中自治系统内部的路由选择RIP(Routing Information Protocol)内部网关协议RIP通告(advertisements)RIP: 链路失效和恢复RIP 进程处理OSPF(Open Shortest Path First)最短路径优先协议OSPF “高级” 特性(在RIP中的…

SEO之高级搜索指令(三)

初创企业需要建站的朋友看这篇文章,谢谢支持: 我给不会敲代码又想搭建网站的人建议 新手上云 (接上一篇。。。。) 11、link: link:也是SEO 常用的指令,用来搜索某个url的反向链接,既包括内部链接&#xf…

【C++11新特性】lambda表达式和应用场景

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; &#x1f525;c系列专栏&#xff1a;C/C零基础到精通 &#x1f525; 给大…

力扣每日一题112:路径总和

题目 简单 给你二叉树的根节点 root 和一个表示目标和的整数 targetSum 。判断该树中是否存在 根节点到叶子节点 的路径&#xff0c;这条路径上所有节点值相加等于目标和 targetSum 。如果存在&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 叶子节点 是…

java入门1.1.1版本

前言&#xff1a; 上面的内容是1.0.0~1.1的内容总结 秉持着先做再定义的理念&#xff0c;这里会带着大家先体验一下类与对象 第一步&#xff1a;新建一个java文件 鼠标右键 → 新建 → 文本文档 → 右键 → 点击重名 → 全选 → hello.java 第二步&#xff1a;用笔记本打开 …

C++进阶:map与set简单自实现

目录 1. map与set封装红黑树的方式1.1 大概实现思路1.2 红黑树模板抽象1.3 红黑树的迭代器 2. 红黑树模板的实现2.1 结点结构的定义2.2 红黑树迭代器的实现2.2.1 迭代器的结构2.2.2 迭代器的方法实现 2.3 树结构的定义2.4 红黑树接口实现2.4.1 插入2.4.2 查找2.4.3 迭代器相关 …

pytest + yaml 框架 - 参数化读取文件路径优化

针对小伙伴提出参数化时读取外部文件&#xff0c;在项目根路径运行没问题&#xff0c;但是进入到项目下子文件夹运行用例&#xff0c;就会找不到文件问题做了优化。 关于参数化读取外部文件相关内容参考前面这篇pytest yaml 框架 -25.参数化数据支持读取外部文件txt/csv/json/…

随手笔记-GNN(朴素图神经网络)

自己看代码随手写的一点备忘录&#xff0c;自己看的&#xff0c;不喜勿喷 GNN (《------ 代码) 刚开始我还在怀疑为什么没有加weigth bias&#xff0c;已经为什么权重才两个&#xff0c;原来是对node_feats进行的network的传播&#xff0c;而且自己内部直接进行了。 下面是一…

【api接口开通教程】YouTube Data API v3申请流程

一、背景调查 1.1 API接口介绍 采集youtube数据&#xff0c;大体分为两种方案&#xff1a;一种是基于爬虫&#xff0c;一种是基于API接口。 说人话就是&#xff1a;爬虫相当于走后门、爬窗户&#xff08;利用技术手段窃取&#xff0c;人家没说给&#xff0c;但我硬拿&#x…

ssrf漏洞学习——基础知识

一、SSRF是什么&#xff1f; SSRF(Server-Side Request Forgery:服务器端请求伪造) 是一种由攻击者构造形成由服务端发起请求的一个安全漏洞。 一般情况下&#xff0c;SSRF攻击的目标是从外网无法访问的内部系统。&#xff08;正是因为它是由服务端发起的&#xff0c;所以它能…

leetcode每日一题第七十二天

class Solution { public:TreeNode* searchBST(TreeNode* root, int val) {if(!root) return root;if(root->val val) return root;else if(root->val > val) return searchBST(root->left,val);else return searchBST(root->right,val);} };

日志打印传值 传引用 右值引用性能测试(Linux/QNX)

结论 Linux平台和qnx平台优化后传值性能都是比传引用的差&#xff0c;也比传右值的差&#xff0c;因此传参时有必要传递引用。 测试代码 #include <cstdint> #include <ctime> #include <string>#ifdef __linux__#define ITERATIONS 10000000 #else#defin…

(七)JSP教程——session对象

浏览器和Web服务器之间的交互通过HTTP协议来完成&#xff0c;HTTP协议是一种无状态的协议&#xff0c;服务器端无法保留浏览器每次与服务器的连接信息&#xff0c;无法判断每次连接的是否为同一客户端。为了让服务器端记住客户端的连接信息&#xff0c;可以使用session对象来记…

STM32--4G DTU 及 阿里云

模块概述 ATK-IDM750C/IDM751C 是正点原子(ALIENTEK)团队开发的一款高性能 4G Cat1 DTU 产品&#xff0c; 支持移动 4G、联通 4G 和电信 4G 手机卡。它以高速率、低延迟和无线数传作为核心功能&#xff0c; 可快速解决应用场景下的无线数传方案。 它支持 TCP/UDP/HTTP/MQTT/DN…

kafka(七)——消息偏移(消费者)

概念 消费者消费完消息后&#xff0c;向_consumer_offset主题发送消息&#xff0c;用来保存每个分区的偏移量。 流程说明 consumer发送JoinGroup请求&#xff1b;coordinator选出一个consumer作为leader&#xff0c;并将topics发送给leader消费者&#xff1b;leader consumer…

全网最详细的Python自动化测试(unittest框架)

&#x1f525; 交流讨论&#xff1a;欢迎加入我们一起学习&#xff01; &#x1f525; 资源分享&#xff1a;耗时200小时精选的「软件测试」资料包 &#x1f525; 教程推荐&#xff1a;火遍全网的《软件测试》教程 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1…

ssm105基于JAVAEE技术校园车辆管理系统+jsp

校园车辆管理系统设计与实现 摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本校园车辆管理系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短…

gtest的编译与使用

文章目录 gtest的编译与使用概述笔记CMake参数官方文档测试程序测试效果END gtest的编译与使用 概述 gTest是 googletest的缩写&#xff0c;如果直接找gTest项目&#xff0c;是找不到的。 库地址 https://github.com/google/googletest.git 迁出到本地后&#xff0c;切到最新…