python进程池pool_python多任务--进程池Pool

进程池Pool

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时就可以用到multiprocessing模块提供的Pool方法。

初始化Pool时,可以指定一个最大进程数,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

一:使用进程池

例1:非阻塞

from multiprocessing import Pool

import os, time, random

def worker(name):

t_start = time.time()

print("%s开始执行,进程号为%d" % (name, os.getpid()))

# random.random()随机生成0~1之间的浮点数

time.sleep(random.random() * 2)

t_stop = time.time()

print(name, "执行完毕,耗时%0.2f" % (t_stop - t_start))

def main():

po = Pool(5) # 定义一个进程池,最大进程数5

# 往进程池中添加任务

for i in range(10):

# Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))

# 每次循环将会用空闲出来的子进程去调用目标

po.apply_async(worker, (f'liang{i}',))

print("----start----")

po.close() # 关闭进程池,关闭后po不再接收新的请求

po.join() # 等待po中所有子进程执行完成,必须放在close语句之后

print("----all_done----")

if __name__ == '__main__':

main()

执行结果

----start----

liang0开始执行,进程号为10404

liang1开始执行,进程号为9920

liang2开始执行,进程号为13136

liang3开始执行,进程号为10180

liang4开始执行,进程号为7708

liang4 执行完毕,耗时0.57

liang5开始执行,进程号为7708

liang2 执行完毕,耗时1.20

liang6开始执行,进程号为13136

liang1 执行完毕,耗时1.33

liang7开始执行,进程号为9920

liang0 执行完毕,耗时1.34

liang8开始执行,进程号为10404

liang3 执行完毕,耗时1.96

liang9开始执行,进程号为10180

liang5 执行完毕,耗时1.73

liang9 执行完毕,耗时0.54

liang8 执行完毕,耗时1.28

liang7 执行完毕,耗时1.37

liang6 执行完毕,耗时1.88

----all_done----

函数解释:

apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表, kwds为传递给func的关键字参数列表;

apply(func[, args[, kwds]]):使用阻塞方式调用func

close():关闭Pool,使其不再接受新的任务;

terminate():不管任务是否完成,立即终止;

join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;

执行说明:创建一个进程池pool,并设定进程的数量为5,range(10)会相继产生10个对象,10个对象被提交到pool中,因pool指定进程数为5,所以0、1、2、3、4会直接送到进程中执行,当其中一个执行完后才空出一个进程处理对象,继续去执行新的对象,所以会出现输出“liang5开始执行,进程号为7708”出现在"liang4 执行完毕,耗时0.57"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“----start----”,主程序在pool.join()处等待各个进程的结束。

例2:阻塞

from multiprocessing import Pool

import os, time, random

def worker(name):

t_start = time.time()

print("%s开始执行,进程号为%d" % (name, os.getpid()))

# random.random()随机生成0~1之间的浮点数

time.sleep(random.random() * 2)

t_stop = time.time()

print(name, "执行完毕,耗时%0.2f" % (t_stop - t_start))

def main():

po = Pool(3) # 定义一个进程池,最大进程数3

# 往进程池中添加任务

for i in range(0, 5):

# Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))

# 每次循环将会用空闲出来的子进程去调用目标

po.apply(worker, (f'liang{i}',))

print("----start----")

po.close() # 关闭进程池,关闭后po不再接收新的请求

po.join() # 等待po中所有子进程执行完成,必须放在close语句之后

print("----all_done----")

if __name__ == '__main__':

main()

输出

liang0开始执行,进程号为1976

liang0 执行完毕,耗时1.75

liang1开始执行,进程号为12624

liang1 执行完毕,耗时0.57

liang2开始执行,进程号为12444

liang2 执行完毕,耗时0.52

liang3开始执行,进程号为1976

liang3 执行完毕,耗时1.23

liang4开始执行,进程号为12624

liang4 执行完毕,耗时0.85

----start----

----all_done----

因为是阻塞,主函数会等待进程的执行,执行完之后才会继续往下,所以运行完所有进程后才输出“----start----”

例3、使用进程池,并返回结果

from multiprocessing import Pool

import os, time, random

def worker(name):

print("%s开始执行,进程号为%d" % (name, os.getpid()))

# random.random()随机生成0~1之间的浮点数

time.sleep(random.random() * 2)

return name,os.getpid()

def main():

po = Pool(3) # 定义一个进程池,最大进程数3

res=[]

# 往进程池中添加任务

for i in range(0, 5):

# Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))

# 每次循环将会用空闲出来的子进程去调用目标

res.append(po.apply_async(worker, (f'liang{i}',)))

print("----start----")

po.close() # 关闭进程池,关闭后po不再接收新的请求

po.join() # 等待po中所有子进程执行完成,必须放在close语句之后

for result in res:

print(result.get()) #get()函数得出每个返回结果的值

print("----all_done----")

if __name__ == '__main__':

main()

输出结果:

----start----

liang0开始执行,进程号为14012

liang1开始执行,进程号为13000

liang2开始执行,进程号为14120

liang3开始执行,进程号为14012

liang4开始执行,进程号为14012

('liang0', 14012)

('liang1', 13000)

('liang2', 14120)

('liang3', 14012)

('liang4', 14012)

----all_done----

例4、多进程执行多个任务

from multiprocessing import Pool

import os, time, random

def worker1(name):

print("%s开始执行work1,进程号为%d" % (name, os.getpid()))

# random.random()随机生成0~1之间的浮点数

time.sleep(random.random() * 2)

def worker2(name):

print("%s开始执行work2,进程号为%d" % (name, os.getpid()))

# random.random()随机生成0~1之间的浮点数

time.sleep(random.random() * 2)

def worker3(name):

print("%s开始执行work3,进程号为%d" % (name, os.getpid()))

# random.random()随机生成0~1之间的浮点数

time.sleep(random.random() * 2)

def main():

po = Pool(4) # 定义一个进程池,最大进程数3

work_list=[worker1,worker2,worker3]

# 往进程池中添加任务

for work in work_list:

for i in range(3):

po.apply_async(work, (f'liang{i}',))

print("----start----")

po.close() # 关闭进程池,关闭后po不再接收新的请求

po.join() # 等待po中所有子进程执行完成,必须放在close语句之后

print("----all_done----")

if __name__ == '__main__':

main()

线程池4个线程执行3个任务,每个任务执行3次。

输出:

----start----

liang0开始执行work1,进程号为13088

liang1开始执行work1,进程号为4908

liang2开始执行work1,进程号为4200

liang0开始执行work2,进程号为8124

liang1开始执行work2,进程号为4908

liang2开始执行work2,进程号为13088

liang0开始执行work3,进程号为8124

liang1开始执行work3,进程号为4200

liang2开始执行work3,进程号为4908

----all_done----

二、进程池进程之间的通讯

进程池中进程的通讯队列

from multiprocessing import Pool, Manager

q = Manager().Queue()

import os

import time

from multiprocessing import Pool, Manager

def work(name, q):

time.sleep(1)

print(f"{name}:---{os.getpid()}---{q.get()}")

def main():

# 创建一个用于进程池通信的队列

q = Manager().Queue()

for i in range(1000):

q.put(f'data-{i}')

# 创建一个拥有五个进程的进程池

po = Pool(5)

# 往进程池中添加20个任务

for i in range(20):

po.apply_async(work, (f'liang{i}', q))

# close:关闭进程池(进程池停止接收任务)

po.close()

# 主进程等待进程池中的任务结束再往下执行

po.join()

if __name__ == '__main__':

main()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/464914.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据集 结构化数据

数据集 一个数据的集和,每一行是一条数据记录,每一列是一个字段。 我们把表中的每一行叫做一个“记录”,每一个记录包含这行中的所有信息,就像在通讯录数据库中某个人全部的信息。字段是比记录更小的单位,字段集合组成…

STM32 ADC 单次模式、连续模式、扫描模式(转载)

ADC单通道: 要求进行一次ADC转换:配置为单次模式使能,扫描模式失能。这样ADC的这个通道,转换一次后,就停止转换。 要求进行连续ADC转换:配置为连续模式使能,扫描模式失能。这样ADC的这个通道&…

LeetCode:验证回文串【125】

LeetCode:验证回文串【125】 题目描述 给定一个字符串,验证它是否是回文串,只考虑字母和数字字符,可以忽略字母的大小写。 说明:本题中,我们将空字符串定义为有效的回文串。 示例 1: 输入: "A man, a …

一些好玩的创客玩件

桌面天气预报站:https://mc.dfrobot.com.cn/thread-311127-1-1.html

云数据中心网络遇到的问题_云数据中心面临安全问题,华为SDN解决方案有一个安全大脑...

CNET科技资讯网 9月23日 北京消息(文/周雅):当越来越多的企业开始采用云服务,安全问题往往成为待考虑的问题。在传统IT环境中,企业默认的逻辑架构是可信的,数据在自己手里,系统部署在自己的数据中心,有自己…

会唱歌的路

文 | 贰沐编辑 | 贰沐 子鱼会唱歌的路?!是什么意思?是说路会自己唱歌吗?开车行驶在普通的道路上,我们能够听到“嗡嗡”的各种杂乱无章的声音,而在有些特殊的路上,我们可以听到路面在发出有节奏的…

linux查看wifi信号命令_使用Nmcli命令从Linux终端连接WiFi

i使用Nmcli命令从Linux终端连接WiFi在linux系统中有几种用于管理无线网络接口的命令行工具。 其中的一些可用于简单查看无线网络接口状态(无论是启动还是关闭 ,或者是否连接到任何网络),如iw , iwlist , ip , ifconfig…

Modbus crc16校验

CRC-16 / MODBUS : 1)CRC寄存器初始值为 FFFF;即16位全为1; 2)CRC-16 / MODBUS的多项式A001H (1010 0000 0000 0001B) ‘H’表示16进制数,‘B’表示二进制数 计算步骤为: (1).预置 16 位寄存…

我的前同事,阿里大牛的技术感悟

以下内容转自前同事现阿里技术大牛-王怀利——现在想想,从业十年了,现在做的活,都不如我大学的时候做的项目,那么具有“技术含量”和挑战。一个是,我用最便宜的12M单片机开发的计时算法,帮老师赚了一笔钱。…

算法熟记-排序系列-堆排序

1. 简述 假设待排序数组为 int array[], 数组长度为n。 主要是利用堆的性质。对于升序排序,使用最大堆。 首先,建堆,使用递归后根序遍历得方法,通过交换元素,保证根元素比孩子元素大。 第1趟,堆顶…

oracle入库的速度能到多少_多线程能提高Oracle的入库速度吗

多线程能提高Oracle的入库速度吗最近常常和同事们讨论“系统架构”,其中有不免提到如何使用“多线程”来改善系统性能。有些同事普遍有一种“认为”:他们认为“多线程”是改善系统性能的“灵丹妙药”,他们简单的认为,“多线程”导…

数据结构-- 线性表之链式存储

https://www.cnblogs.com/ZWOLF/p/10604252.html

那些年,我和发哥在恒大的日子

在广州上班那会,我们在恒大中心旁边的利通大厦上班,我和薛总每天一起上下班,那时候宿舍还有盼盼,有时候玩开心的时候,我就会跟他们说,等过了很多年后,我们要写一本说,书的名字就叫做…

十六进制转化为十进制

package lsh.element.numbersystem;import java.util.Scanner;/*** * desc 有意思的地方:两种思想得到的结果都是正确的,但是超出int类型最大之后,错误值却不同* * author * LSH* 2018年9月23日*/ public class HexToDecimalConver…

回来了

三年了 又回来了 未来去哪里转载于:https://blog.51cto.com/itcnjd/589429

KEIL高级调试——条件断点

在线调试程序时,打断点是非常有效的一种方式,配合单步调试,可以快速定位问题。但是有时候,手动打断点用起来不是那么方便。比如想要在一个循环的第N次停下来,如果手动打断点,那就要不停的点击单步运行&…

emailjava中怎么校验_Java使用注解实现参数统一校验功能

在项目开发中,当使用配置文件的时候,需要对一些配置参数进行合法校验,如果不存在则会抛出异常或者提醒用户重新修改配置文件后运行系统。 以前的做法就是读取到配置文件后,每个配置项挨个检查,写多个if判断是否存在问题…

原来保险丝熔断原理是这样的

如果电路中的保险丝熔断了,想亮起一个灯来指示,可以考虑用这个电路:这个电路的工作逻辑:当保险丝F1正常工作时,只亮起绿灯LED2。当保险丝F1熔断时,熄灭绿灯LED2,亮起红灯LED1,告诉人…

redhat6 使用raid5的系统安装

raid5安装步骤(有三个磁盘,分别是sda/sdb/sdc) 1.独立给/boot创建一个分区(可以在创建software raid前后创建,但是不能加入raid software,例如:一个磁盘为sda,那我们把sda1作为/boot的独立分区&…

docker 定时重启脚本_使用 Go 添加启动脚本

简介实践困惑总结当前部分的代码简介虽然 Makefile 能很好的整合各种命令, 是一个非常方便的工具. 但启动脚本也是必不可少的, Makefile 更多用于开发阶段, 比如编译, 单元测试等流程.启动脚本的作用是控制程序的状态, 管理程序的启动, 停止, 查询运行状态等.实践直接上脚本了:…