python线程池提交任务

1. 线程池参数设置

  1. CPU数量:N
  2. 线程池的核心线程数量
    IO密集型的话,一般设置为 2 * N + 1
    CPU密集型的话,一般设置为 N + 1 或者 使用进程池。
  3. 线程池的最大任务队列长度
    (线程池的核心线程数 / 单个任务的执行时间)* 2
    如果线程池有10个核心线程,单个任务的执行时间为0.1s,那么最大任务队列长度设置为200。
from concurrent.futures import ThreadPoolExecutor
thread_pool = ThreadPoolExecutor(max_workers=10)

2. submit方式提交

submit 这种提交方式是一条一条地提交任务:
1. 可以提交不同的任务函数;
2. 线程池的线程在执行任务时出现异常,程序不会停止,而且也看不到对应的报错信息;
3. 得到的结果是乱序的。

import time
from concurrent.futures import ThreadPoolExecutor, as_completeddef run_task(delay):print(f"------------> start to execute task {delay} <------------")time.sleep(delay)print(f"------------> task {delay} execute over !!! <------------")return delay + 10000task_params = [1, 4, 2, 5, 3, 6] * 10
threadpool_max_worker = 10      # io密集型:cpu数量*2+1;cpu密集型:cpu数量+1
thread_pool = ThreadPoolExecutor(max_workers=threadpool_max_worker)############################### 方式1. 虽然是异步提交任务,但是却是同步执行任务。
for p in task_params:future = thread_pool.submit(run_task, p)print(future.result())      # 直接阻塞当前线程,直到任务完成并返回结果,即变成同步############################### 方式2. 异步提交任务,而且异步执行任务,乱序执行,结果乱序。
future_list = []
for p in task_params:future = thread_pool.submit(run_task, p)future_list.append(future)for res in as_completed(future_list):       # 等待子线程执行完毕,先完成的会先打印出来结果,结果是无序的print(f"get last result is {res.result()}")

3. map方式提交

map 这种提交方式可以分批次提交任务:

  1. 每个批次提价的任务函数都相同;
  2. 线程池的线程在执行任务时出现异常,程序终止并打印报错信息;
  3. 得到的结果是有序的。
import time
from concurrent.futures import ThreadPoolExecutor, as_completeddef run_task(delay):print(f"------------> start to execute task {delay} <------------")time.sleep(delay)print(f"------------> task {delay} execute over !!! <------------")return delay + 10000task_params = [1, 4, 2, 5, 3, 6] * 10
threadpool_max_worker = 5      # io密集型:cpu数量*2+1;cpu密集型:cpu数量+1
thread_pool = ThreadPoolExecutor(max_workers=threadpool_max_worker)task_res = thread_pool.map(run_task, task_params)		# 批量提交任务,乱序执行
print(f"main thread run finished!")
for res in task_res:		# 虽然任务是乱序执行的,但是得到的结果却是有序的。print(f"get last result is {res}")

4. 防止一次性提交的任务量过多

import time
from concurrent.futures import ThreadPoolExecutordef run_task(delay):print(f"------------> start to execute task <------------")time.sleep(delay)print(f"------------> task execute over !!! <------------")task_params = [1, 4, 2, 5, 3, 6] * 100
threadpool_max_worker = 10      # io密集型:cpu数量*2+1;cpu密集型:cpu数量+1
thread_pool = ThreadPoolExecutor(max_workers=threadpool_max_worker)
threadpool_max_queue_size = 200     # 线程池任务队列长度一般设置为  (线程池核心线程数/单个任务执行时间)* 2for p in task_params:print(f"*****************> 1. current queue size of thread pool is {thread_pool._work_queue.qsize()}")while thread_pool._work_queue.qsize() >= threadpool_max_queue_size:time.sleep(1)		# sleep时间要超过单个任务的执行时间print(f"*****************> 2. current queue size of thread pool is {thread_pool._work_queue.qsize()}")thread_pool.submit(run_task, p)print(f"main thread run finished!")

5. 案例分享

案例背景:由于kafka一个topic的一个分区数据只能由一个消费者组中的一个消费者消费,所以现在使用线程池,从kafka里消费某一个分区的数据,将数据提取出来并存于mysql或者redis,然后手动提交offset。

import time
import queue
import concurrent.futures
from threading import Thread
from concurrent.futures import ThreadPoolExecutordef send_task_to_queue(q, params):for idx, p in enumerate(params):q.put((idx, p))     # 把kafka数据put到queue里,如果queue满了就先阻塞着,等待第15行get数据后腾出空间,这里继续put数据print(f"\n set p: {p} into task queue, queue size is {q.qsize()}")def run_task(param_queue):idx, p = param_queue.get()      # 这里一直get数据,即使queue空了,只要kafka持续产生数据,第10行就会持续put数据到queue里print(f"\n ------------> start to execute task {idx} <------------")time.sleep(p)print(f"\n ------------> task {idx} execute over !!! <------------")return idxtask_params = [1, 4, 2, 5, 3] * 20      # 数据模拟kafka中消费得到的数据
thread_pool = ThreadPoolExecutor(max_workers=10)
task_param_queue = queue.Queue(maxsize=10)# 这里启动一个子线程一直往queue里put数据
thread_send_task = Thread(target=send_task_to_queue, args=(task_param_queue, task_params))
thread_send_task.start()while True:     # 这里为什么一直死循环:只要生产者生产数据存储在kafka中,那么消费者就一直能获取到数据future_list = []# 分批去消费queue里的数据for i in range(10):# 这里的子线程从queue里get任务后,queue腾出空间,上面的子线程继续往里面put数据future_list.append(thread_pool.submit(run_task, task_param_queue))# 子线程任务执行结束后,从结果里取最大的索引值,可以用于redis记录,并用户手动提交offset...complete_res, uncomplete_res = concurrent.futures.wait(future_list)future_max_idx = max([future_complete.result() for future_complete in complete_res])print(f"\n ######################################## every batch's max idx is {future_max_idx}")...		# 自行使用 future_max_idx 这个值做处理...

6. 案例优化

上面的案例,是将数据存储于线程队列中,保证每个子线程get()到的数据不重复。
既然线程队列可以保证每个子线程get到的数据不重复,那么利用生成器的一次性特性(使用完一次就没了),是不是也能达到这个效果呢?
试着优化下:

import time
import concurrent.futures
from concurrent.futures import ThreadPoolExecutordef run_task(param_generator):try:idx, p = next(param_generator)print(f"\n ------------> start to execute task idx {idx} <------------")time.sleep(p)print(f"\n ------------> task value {p} execute over !!! <------------")return idxexcept StopIteration:return -1# 这里将task_params变成生成器,使用生成器的一次性特性:消费完一次后数据就消失了
task_params_generator = ((idx, val) for idx, val in enumerate([1, 4, 2, 5, 3] * 20))
thread_pool = ThreadPoolExecutor(max_workers=10)while True:     # 这里为什么一直死循环:只要生产者生产数据存储在kafka中,那么消费者就一直能获取到数据future_list = []# 分批去消费生成器中的数据for i in range(10):# 这里的子线程从生成器中消费数据future_list.append(thread_pool.submit(run_task, task_params_generator))# 子线程任务执行结束后,从结果里取最大的索引用于redis记录,并手动提交offsetcomplete_res, uncomplete_res = concurrent.futures.wait(future_list)future_max_idx = max([future_complete.result() for future_complete in complete_res])print(f"\n ######################################## every batch's max idx is {future_max_idx}")if future_max_idx == -1:        # 如果为-1,说明生成器的数据已经迭代完了,等待kafka新生成数据print(f"\n generator has empty !!!!!")time.sleep(60)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/611845.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 84:柱状图中的最大矩形

一、题目描述 给定 n 个非负整数&#xff0c;用来表示柱状图中各个柱子的高度。每个柱子彼此相邻&#xff0c;且宽度为 1 。 求在该柱状图中&#xff0c;能够勾勒出来的矩形的最大面积。 示例 1: 输入&#xff1a;heights [2,1,5,6,2,3] 输出&#xff1a;10 解释&#xff1a…

Jmeter+ant+Jenkins 接口自动化框架完整版

接口自动化测试单有脚本是不够的&#xff0c;我们还需要批量跑指定接口&#xff0c;生成接口运行报告&#xff0c;定位报错接口&#xff0c;接口定时任务&#xff0c;邮件通知等功能。批量跑指定接口&#xff1a;我们可以利用ant批量跑指定目录下的Jmeter脚本生成接口运行报告&…

vue3基础类型和引用类型,和store的使用

案例一&#xff1a; 如果我在store创建一个变量&#xff0c;是读取缓存key为name的数据&#xff0c; store.name 默认值是张三 # 声明一个变量 const title ref(store.name) # 然后修改title.value "李四"&#xff0c; # 问&#xff1a;打印store.name&#xff0…

怎么投稿各大媒体网站?

怎么投稿各大媒体网站&#xff1f;这是很多写作者及自媒体从业者经常面临的问题。在信息爆炸的时代&#xff0c;如何将自己的文章推送到广大读者面前&#xff0c;成为了一个不可避免的挑战。本文将为大家介绍一种简单有效的投稿方法——媒介库发稿平台发稿&#xff0c;帮助大家…

5,sharding-jdbc入门-sharding-jdbc广播表

执行sql #在数据库 user_db、order_db_1、order_db_2中均要建表 CREATE TABLE t_dict (dict_id BIGINT (20) NOT NULL COMMENT 字典id,type VARCHAR (50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 字典类型,code VARCHAR (50) CHARACTER SET utf8 COLLAT…

国产AI工具钉钉AI助理:开启个性化助手服务的新篇章

钉钉AI助理是钉钉平台的一项功能&#xff0c;它可以根据用户的需求提供个性化的AI助手服务。用户可以在AI助理页面一键创建个性化的AI助理&#xff0c;如个人的工作AI助理、旅游AI助理、资讯AI助理等。企业也可以充分使用企业所沉淀的知识库和业务数据&#xff0c;在获得授权后…

C#入门篇(一)

变量 顾名思义就是变化的容器&#xff0c;即可以用来存放各种不同类型数值的一个容器 折叠代码 第一步&#xff1a;#region 第二步&#xff1a;按tab键 14种数据类型 有符号的数据类型 sbyte&#xff1a;-128~127 short&#xff1a;-32768~32767 int&#xff1a;-21亿多~21亿多…

pom依赖相关

一、创建可执行jars&#xff1a; 1、可执行jars&#xff1a;可执行 jars&#xff08;有时称为“fat jars”&#xff09;是包含已编译类以及代码需要运行的所有 jar 依赖项的存档。 2、方法&#xff1a;pom中添加spring-boot-maven-plugin&#xff0c;可将项目打包成可执行jar…

CHS_01.2.1.1+2.1.3+进程的概念、组成、特征

CHS_01.2.1.12.1.3进程的概念、组成、特征 进程进程的概念 进程的组成——PCB进程的组成——PCB进程的组成——程序段、数据段知识滚雪球&#xff1a;程序是如何运行的&#xff1f;进程的组成进程的特征 知识回顾与重要考点 从这个小节开始 我们会正式进入第二章处理机管理相关…

封装动画函数

文章目录 需求分析确定参数确定属性值具体实现简单扩展 需求分析 在 css 中&#xff0c;如果要给一个元素设置动画&#xff0c;就要改变一个css属性&#xff0c;也是一个值到另外一个值的变化&#xff0c;但是放入到我们这里的动画函数里面&#xff0c;我是不知道是具体要用到…

STK 特定问题建模(五)频谱分析(第二部分)

文章目录 简介三、链路分析3.1 星地链路干扰分析3.2 频谱分析 简介 本篇对卫星通信中的频谱利用率、潜在干扰对频谱的影响进行分析&#xff0c;以LEO卫星信号对GEO通信链路影响为例&#xff0c;分析星地链路频谱。 建模将从以下几个部分开展&#xff1a; 1、GEO星地通信收发机…

Java接口的解析

在 Java 中&#xff0c;接口&#xff08;Interface&#xff09;是一种抽象类型&#xff0c;用于定义一组相关方法的契约。接口只包含方法的签名&#xff0c;而没有方法的实现。实现接口的类必须提供接口中定义的方法的具体实现。 以下是对 Java 接口的解析&#xff1a; 这只是…

springboot常用扩展点

当涉及到Spring Boot的扩展和自定义时&#xff0c;Spring Boot提供了一些扩展点&#xff0c;使开发人员可以根据自己的需求轻松地扩展和定制Spring Boot的行为。本篇博客将介绍几个常用的Spring Boot扩展点&#xff0c;并提供相应的代码示例。 1. 自定义Starter(面试常问) Sp…

使用Scikit Learn 进行识别手写数字

使用Scikit Learn 进行识别手写数字 作者&#xff1a;i阿极 作者简介&#xff1a;数据分析领域优质创作者、多项比赛获奖者&#xff1a;博主个人首页 &#x1f60a;&#x1f60a;&#x1f60a;如果觉得文章不错或能帮助到你学习&#xff0c;可以点赞&#x1f44d;收藏&#x1f…

MySql -数据库进阶

一、约束 1.外键约束 外键约束概念 让表和表之间产生关系&#xff0c;从而保证数据的准确性&#xff01; 建表时添加外键约束 为什么要有外键约束 -- 创建db2数据库 CREATE DATABASE db2; -- 使用db2数据库 USE db2;-- 创建user用户表 CREATE TABLE USER(id INT PRIMARY KEY …

2024-01-09 Android.mk 根据c文件名插入特定的宏定义,我这里用于定义log LOG_TAG 标签

一、在Android的构建系统中&#xff0c;使用Android.mk构建脚本可以根据特定需求来定义宏。如果你想根据C文件的名称来插入特定的宏定义&#xff0c;可以使用条件语句检查文件名&#xff0c;并相应地设置宏。 在Android的构建系统中&#xff0c;使用Android.mk构建脚本可以根据…

利用邮件发送附件来实现一键巡检,附件是通过调用zabbix api生成的word和Excel

HTML部分&#xff1a; <!DOCTYPE html> <html> <head><title>自动巡检</title><!-- 加入CSS样式 --> </head> <body><form id"inspectionForm"><label for"email">邮箱地址:</label>&…

图片分类的脚本

当前有个名为“image”的文件夹和名为“label”的txt文件&#xff0c;txt文件里的每一行包含了“photos”文件夹里每一个图片文件的文件名 一个空格 对应的标签&#xff08;1、2....8&#xff09;&#xff0c;请编写一个脚本&#xff0c;并创建一个新的文件夹&#xff0c;里面…

【MySQL】表设计与范式设计

文章目录 一、数据库表设计一对一一对多多对多 二、范式设计第一范式第二范式第三范式BC范式第四范式 一、数据库表设计 一对一 举个例子&#xff0c;比如这里有两张表&#xff0c;用户User表 和 身份信息Info表。 因为一个用户只能有一个身份信息&#xff0c;所以User表和In…

jmeter+ant+Jenkins集成

一、 环境准备 1、Jenkins下载&#xff1a;https://jenkins.io/zh/download/ 2、 Jenkins安装&#xff1a;解压下载的压缩包&#xff0c;直接点击msi文件安装即可 4、 Jenkins登录用户设置&#xff1a;装&#xff1a; 浏览器地址栏中输入&#xff1a;http://localhost:8080/…