Python 多线程、线程池、进程池

线程间的通讯机制

消息队列

event 事件对象

当线程创建完成之后,并不会马上执行线程,而是等待某一事件发生,线程才会启动

import threading# # 创建 event 对象
# event = threading.Event()
# # 重置代码中的 event 对象,使得所有该event事件都处于待命状态
# event.clear()
# # 阻塞线程,等待 event 指令
# event.wait()
# # 发送 event 指令,使得所有设置该 event 事件的线程执行
# event.set()class MyThreading(threading.Thread):def __init__(self, event):super().__init__()self.event = eventdef run(self):print('线程{}已经初始化完成,随时准备启动...'.format(self.name))# 阻塞线程,让线程等待指令后再启动self.event.wait()print('{}开始执行...'.format(self.name))if __name__ == '__main__':event = threading.Event()# 创建 10 个自定义线程对象并放入列表threads = [MyThreading(event) for i in range(10)]# 重置代码中的 event 对象,使得所有该event事件都处于待命状态event.clear()# 执行线程# 执行到 run 方法中 self.event.wait() 位置,即打印了:线程{}已经初始化完成...[t.start() for t in threads]# 发送 event 指令,使得所有设置该 event 事件的线程执行# 即启动 threads 列表中的所有线程# 接着执行 run 方法中 self.event.wait() 后面的代码,即打印了:{}开始执行...event.set()[t.join() for t in threads]

condition 条件对象

import threading# condition 对象适用于线程轮流执行,或一个线程等待另一个线程的情况,如两个人的对话等# 创建 condition 对象
cond = threading.Condition()class ThreadA(threading.Thread):def __init__(self, cond, name):super().__init__(name=name)self.cond = conddef run(self):# 获取锁self.cond.acquire()# 线程A说了第一句话print(self.getName(), ':一二三四五')# 唤醒其他处于 wait 状态的线程(通知线程B可以说话了)self.cond.notify()# 线程A进入 wait 状态,等待线程B通知(唤醒)self.cond.wait()# 被线程A唤醒后说了第二句话print(self.name, ':山无棱,天地合,乃敢与君绝')self.cond.notify()  # 通知线程Bself.cond.wait()  # 等待线程B通知# 被线程A唤醒后说了第三句话,最后一句话print(self.name, ':有钱吗?借点')self.cond.notify()  # 通知线程Bself.cond.release()  # 释放锁class ThreadB(threading.Thread):def __init__(self, cond, name):super().__init__(name=name)self.cond = conddef run(self):# 获取锁self.cond.acquire()self.cond.wait()  # 由于它不是第一个说话的人,所以一开始等待通知# 线程B说了第一句话print(self.getName(), ':上山打老虎')# 唤醒其他处于 wait 状态的线程(通知线程A可以说话了)self.cond.notify()# 线程B进入 wait 状态,等待线程A通知(唤醒)self.cond.wait()# 被线程B唤醒后说了第二句话print(self.name, ':海可枯,石可烂,激情永不散')self.cond.notify()  # 通知线程Aself.cond.wait()  # 等待线程A通知# 被线程B唤醒后说了第三句话,最后一句话print(self.name, ':没有,滚')# self.cond.notify()  # 已经是最后一句话,不需要通知线程Aself.cond.release()  # 释放锁if __name__ == '__main__':a = ThreadA(cond, 'AAA')b = ThreadB(cond, 'BBB')# 线程A先说话,但是不能先启动线程A# 因为如果启动了线程A,然后线程A说完第一句话后,通知线程B# 但是此时线程B没有启动,就接收不了A的通知,B就会一直处于 wait 状态,即说不了话,也通知不了A# A等不到B的通知,也会一直处于 wait 状态# a.start()# b.start()b.start()a.start()

线程间的消息隔离机制

使用场景

在使用多线程的过程中,会有一种变量的使用场景: 一个变量会被所有的线程使用,但是每个线程都会对该变量设置不同的值, threading.local() 提供了这种变量

使用方法

"""
在使用多线程的过程中,会有一种变量的使用场景:一个变量会被所有的线程使用,但是每个线程都会对该变量设置不同的值threading.local() 提供了这种变量假设有一个场景:设置一个 threading.local 变量,然后新建两个线程分别设置这两个 threading.local 的值再分别打印这两个 threading.local 的值看每个线程打印出来的 threading.local 值是否不一样
"""
import threading# local_data 实际上是一个对象
local_data = threading.local()
# 设置 local_data 的名字
local_data.name = 'local_data'class MyThread(threading.Thread):def run(self):print('赋值前-子线程:', threading.currentThread(), local_data.__dict__)# 在子线程中修改 local_data.name 的值local_data.name = self.getName()print('赋值后-子线程:', threading.currentThread(), local_data.__dict__)if __name__ == '__main__':print('开始前-主线程:', local_data.__dict__)t1 = MyThread()t1.start()t1.join()t2 = MyThread()t2.start()t2.join()print('结束后-主线程:', local_data.__dict__)"""
输出结果:
开始前-主线程: {'name': 'local_data'}
赋值前-子线程: <MyThread(Thread-1, started 8480)> {}
赋值后-子线程: <MyThread(Thread-1, started 8480)> {'name': 'Thread-1'}
赋值前-子线程: <MyThread(Thread-2, started 2572)> {}
赋值后-子线程: <MyThread(Thread-2, started 2572)> {'name': 'Thread-2'}
结束后-主线程: {'name': 'local_data'}
"""

线程池

线程池中存放多个线程,当有业务需要线程来执行时,可以直接从线程池中获取一个线程来执行该业务, 业务执行完毕之后,线程不会释放,而是被放回线程池中,从而节省了线程的创建以及销毁的时间。 Python concurrent.futures 模块中的 ThreadPoolExecutor 就提供了线程池,该线程池有以下特点:

  • 主线程可以获取某一个线程或任务的状态,以及返回值
  • 当一个线程完成的时候,主线程能够立即知道
  • 让多线程和多进程的编码接口一致

线程池的简单应用

from concurrent.futures import ThreadPoolExecutor
import time# 创建线程池对象,并指定线程池中最大的线程数为 3
# 当业务数不超过 3 的时候,ThreadPoolExecutor 就会创建一个新的线程来执行业务
# 当超过 3 时,ThreadPoolExecutor 不会创建新的线程,而是等待执行其他业务的线程执行完毕后返回
# 再将返回的线程分配给需要的业务
executor = ThreadPoolExecutor(max_workers=3)# 定义一个业务
# 假设这里模拟一个爬虫,爬取一个网页页面
def get_html(timers):time.sleep(timers)  # 模拟耗时操作print('获取网页信息{}完毕'.format(timers))return timers# 提交要执行的函数,即要完成的业务到线程池中,然后线程池就会自动分配线程去完成对应的业务
# submit 方法会立即返回,不会阻塞主线程
# get_html 的参数放在后面,即 1 会作为参数传递给 get_html() 中的 timers
# 以下创建了四个任务
task1 = executor.submit(get_html, 1)
task2 = executor.submit(get_html, 2)
task3 = executor.submit(get_html, 3)
task4 = executor.submit(get_html, 4)bool1 = task1.done()  # 检查任务是否完成,完成返回 True
bool2 = task2.cancel()  # 取消任务执行,只有该任务没有被放入线程池中才能取消成功,成功返回 True# 拿到任务执行的结果,如 get_html 的返回值
# timeout 参数用于设置等待结果的最长等待时间,单位为秒
# result 方法是一个阻塞方法
timers = task3.result(timeout=10)
print(timers)
print(111)

线程池中常用的方法

  • as_complete
# 线程池的简单应用
from concurrent.futures import ThreadPoolExecutor, as_completed
import time# 创建线程池对象,并指定线程池中最大的线程数为 3
# 当业务数不超过 3 的时候,ThreadPoolExecutor 就会创建一个新的线程来执行业务
# 当超过 3 时,ThreadPoolExecutor 不会创建新的线程,而是等待执行其他业务的线程执行完毕后返回
# 再将返回的线程分配给需要的业务
executor = ThreadPoolExecutor(max_workers=3)# 定义一个业务
# 假设这里模拟一个爬虫,爬取一个网页页面
def get_html(timers):time.sleep(timers)  # 模拟耗时操作print('获取网页信息{}完毕'.format(timers))return timers# 模拟要爬取的 url
urls = [1, 2, 3]
# 通过列表推导式构造多线程任务
all_tasks = [executor.submit(get_html, url) for url in urls]
# as_completed 接收一个可迭代对象
# as_completed 是一个生成器,当任务没有完成时,它会阻塞,只有当任务结束返回结果时才会继续往下执行
# as_completed 函数的作用:拿到所有任务执行完毕之后的结果
# 不需要我们手动调用 done 方法不停地判断任务是否完成
for item in as_completed(all_tasks):data = item.result()print('主线程中获取任务的返回值是{}'.format(data))
"""
执行结果:
获取网页信息1完毕
主线程中获取任务的返回值是1
获取网页信息2完毕
主线程中获取任务的返回值是2
获取网页信息3完毕
主线程中获取任务的返回值是3
"""
  • map
from concurrent.futures import ThreadPoolExecutor
import time# 创建线程池对象,并指定线程池中最大的线程数为 3
# 当业务数不超过 3 的时候,ThreadPoolExecutor 就会创建一个新的线程来执行业务
# 当超过 3 时,ThreadPoolExecutor 不会创建新的线程,而是等待执行其他业务的线程执行完毕后返回
# 再将返回的线程分配给需要的业务
executor = ThreadPoolExecutor(max_workers=3)# 定义一个业务
# 假设这里模拟一个爬虫,爬取一个网页页面
def get_html(timers):time.sleep(timers)  # 模拟耗时操作print('获取网页信息{}完毕'.format(timers))return timers# 模拟要爬取的 url
urls = [4, 2, 3]# map 方法和 as_complete 类似
# map 也是一个生成器,当任务没有完成时,它会阻塞,只有当任务结束返回结果时才会继续往下执行
# map 会自动映射 urls 中的每一个元素传递给 get_html 函数,并自动提交 ,不需要通过 submit 方法提交任务
# map 方法直接拿到任务执行的结果
# as_complete 和 map 都可以拿到线程池中各个线程执行的结果,但有以下区别:
# as_complete 会根据任务完成的快慢得到结果,即哪个任务先完成就会先得到该任务的结果
# 而 map 会严格按照任务的顺序得到结果,比如按照 urls 列表中的映射顺序得到对应的结果
# 所以两种适用于不同的场景
for data in executor.map(get_html, urls):print('主线程中获取任务的返回值是{}'.format(data))
"""
获取网页信息2完毕
获取网页信息3完毕
获取网页信息4完毕
主线程中获取任务的返回值是4
主线程中获取任务的返回值是2
主线程中获取任务的返回值是3
"""
  • wait
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
import time# 创建线程池对象,并指定线程池中最大的线程数为 3
# 当业务数不超过 3 的时候,ThreadPoolExecutor 就会创建一个新的线程来执行业务
# 当超过 3 时,ThreadPoolExecutor 不会创建新的线程,而是等待执行其他业务的线程执行完毕后返回
# 再将返回的线程分配给需要的业务
executor = ThreadPoolExecutor(max_workers=3)# 定义一个业务
# 假设这里模拟一个爬虫,爬取一个网页页面
def get_html(timers):time.sleep(timers)  # 模拟耗时操作print('获取网页信息{}完毕'.format(timers))return timers# 模拟要爬取的 url
urls = [4, 2, 3]all_tasks = [executor.submit(get_html, url) for url in urls]# 让主线程阻塞,直到参数里的条件成立
# 根据 wait 函数的参数,条件成立的情况是:所有任务执行完毕
# ALL_COMPLETED 表示所有任务都执行完成
# 还有其他的参数,如 FIRST_COMPLETED 表示只要有一个任务完成就条件成立
wait(all_tasks, return_when=FIRST_COMPLETED)# 如果想等代码执行完毕之后再打印下列语句,可以使用 wait 语句
print('代码执行完毕')

进程池

使用 concurrent.future 模块提供的 ProcessPoolExecutor 来实现进程池,用法和线程池完全一致,参考上述线程池的使用(建议使用该种方式使用进程池)

下面是基于 Pool 类实现的进程池的使用

import multiprocessing
import time# 定义一个业务
# 假设这里模拟一个爬虫,爬取一个网页页面
def get_html(n):time.sleep(n)  # 模拟耗时操作print('子进程{}获取内容成功'.format(n))return nif __name__ == '__main__':# 设置进程数,一般设置为和 CPU 数量一致的比较合理# multiprocessing.cpu_count() 获取当前主机的 CPU 核心数pool = multiprocessing.Pool(multiprocessing.cpu_count())# apply_async 是一个异步方法# apply 是一个同步方法# 作用类似于 submit 方法result = pool.apply_async(get_html, args=(2,))pool.close()  # 必须在 join 方法前调用,否则会抛出异常# join 方法会等待所有的子进程执行完毕之后,才会继续往下执行主进程的代码# 即 join 会阻塞主进程代码pool.join()# result.get() 拿到子进程执行结果, get 方法是一个阻塞方法print(result.get())print('end...')print()# map 方法的使用pool = multiprocessing.Pool(multiprocessing.cpu_count())# imap 方法会按照列表顺序输出# imap_unordered 方法则不会按照列表顺序执行,而是按照任务执行的快慢输出for result in pool.imap(get_html, [1, 2, 3]):print('{}休眠执行成功'.format(result))

线程同步信号量(semaphore)的使用

同步信号量的作用是用于控制同时工作的线程数量,如读文件时只能同时允许两个线程读,在爬虫时控制同时爬虫的线程,防止触发网站反扒机制

import threading
import time# 还是模拟一个爬虫
# HtmlSpider 类负责根据给定的 URL 去爬取网页内容
class HtmlSpider(threading.Thread):def __init__(self, url, sem):super().__init__()self.url = urlself.sem = semdef run(self):time.sleep(2)print('网页内容获取完成')self.sem.release()  # 线程完成任务,释放锁# UrlProducer类负责给 HtmlSpider 类提供网页的 URL
class UrlProducer(threading.Thread):def __init__(self, sem):super().__init__()self.sem = semdef run(self):for i in range(10):self.sem.acquire()  # 获取锁,获取成功才能执行线程html_thread = HtmlSpider('url{}'.format(i), self.sem)  # 创建HtmlSpider线程html_thread.start()  # 启动线程if __name__ == '__main__':# 创建线程同步信号量# 参数 value 指定允许同时工作的线程数sem = threading.Semaphore(value=3)url_producer = UrlProducer(sem)url_producer.start()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/77729.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

命令行git联网失败,但是实际可以联网

最近下载代码的时候发现总是告诉我连不上github的网页&#xff0c;但是我自己通过浏览器又可以上网&#xff0c;找了半天发现这个方法可以。 记录下这个代理 打开git bash 执行以下命令&#xff1a; git config --global http.proxy http://127.0.0.1:7890 git config --glob…

Python 图形化界面基础篇:添加复选框( Checkbutton )到 Tkinter 窗口

Python 图形化界面基础篇&#xff1a;添加复选框&#xff08; Checkbutton &#xff09;到 Tkinter 窗口 引言什么是 Tkinter 复选框&#xff08; Checkbutton &#xff09;&#xff1f;步骤1&#xff1a;导入 Tkinter 模块步骤2&#xff1a;创建 Tkinter 窗口步骤3&#xff1a…

【C++】封装unordered_map和unordered_set(用哈希桶实现)

前言&#xff1a; 前面我们学习了unordered_map和unordered_set容器&#xff0c;比较了他们和map、set的查找效率&#xff0c;我们发现他们的效率比map、set高&#xff0c;进而我们研究他们的底层是由哈希实现。哈希是一种直接映射的方式&#xff0c;所以查找的效率很快…

LeetCode——动态规划篇(一)

刷题顺序及思路来源于代码随想录&#xff0c;网站地址&#xff1a;https://programmercarl.com 目录 509. 斐波那契数 - 力扣&#xff08;LeetCode&#xff09; 70. 爬楼梯 - 力扣&#xff08;LeetCode&#xff09; 746. 使用最小花费爬楼梯 - 力扣&#xff08;LeetCode&a…

瞄准办公场景,未来智能靠“AI+耳机”后来居上?

如何在广阔红海中开拓出蓝海&#xff1f;未来智能或可作为参考案例。 作为TWS耳机玩家&#xff0c;未来智能成立于2021年&#xff0c;日前完成了由天际资本领投的数千万元Pre-A轮融资&#xff0c;这也是该公司成立以来完成的第二轮融资。 从成立时间来看&#xff0c;在广阔的…

Linux---应用层获取usb设备描述信息通过endpoint地址数据通讯

文章目录 &#x1f308;应用层获取USB设备信息总体思路&#x1f308;应用层代码实例&#x1f308;实例测试&#x1f308;应用层通过endpoint进行数据读写 &#x1f308;应用层获取USB设备信息总体思路 应用层可以打开USB设备的节点&#xff0c;读取包括USB设备的配置&#xff…

deepspeed训练报错torch.distributed.elastic.multiprocessing.errors.ChildFailedError

测试场景&#xff1a;使用deepspeed框架训练gpt模型 问题&#xff1a; 报错torch.distributed.elastic.multiprocessing.errors.ChildFailedError 具体见截图&#xff1a;

微信小程序连接标签打印机,打印标签

授权蓝牙并且搜索蓝牙设备 // 授权定位和蓝牙 authorizationBluetooth() {wx.getSetting({success: (res) > {if (res.authSetting.hasOwnProperty(scope.bluetooth)) {if (!res.authSetting[scope.bluetooth]) {this.setData({showManual: true})} else {this.bluetoothSt…

UMA 2 - Unity Multipurpose Avatar☀️六.Advanced Occlusion高级遮挡功能解决皮肤服饰穿模

文章目录 🟥 本节功能效果展示🟧 基础项目配置🟨 本节项目配置🟩 配置MeshHideAsset1️⃣ 创建MeshHideAsset2️⃣ 配置SlotDataAsset3️⃣ 配置遮挡信息🟦 将 MeshHideAsset 配置到 Recipe🟥 本节功能效果展示 未遮挡前的穿模问题: 遮挡后效果:

vue3项目运行时解析文件的顺序

在 Vue 3 项目运行时&#xff0c;文件的解析顺序如下&#xff1a; 首先&#xff0c;Vue 3 会解析并执行 main.js 文件。这是整个应用程序的入口文件。通常&#xff0c;在 main.js 中创建 Vue 应用实例&#xff0c;并将根组件挂载到指定的 DOM 元素上。 在 main.js 中&#xff…

Flink中的批和流

批处理的特点是有界、持久、大量&#xff0c;非常适合需要访问全部记录才能完成的计算工作&#xff0c;一般用于离线统计。 流处理的特点是无界、实时, 无需针对整个数据集执行操作&#xff0c;而是对通过系统传输的每个数据项执行操作&#xff0c;一般用于实时统计。 而在Flin…

厂商征集 | 2023年中国RPA市场洞察研究报告正式启动

RPA中国基于在科技行业的资源积累&#xff0c;以及对各领域「技术领导者」、「技术应用者」、「产品服务商」的深度调研&#xff0c;2023年&#xff0c;我们重点推出MI报告 ( Market Insight )、CI Vendor报告&#xff08;Comprehensive Influence Vendor&#xff09;两个系列。…

【Jmeter】什么是BeanShell?

一、什么是BeanShell&#xff1f; BeanShell是用Java写成的,一个小型的、免费的、可以下载的、嵌入式的Java源代码解释器&#xff0c;JMeter性能测试工具也充分接纳了BeanShell解释器&#xff0c;封装成了可配置的BeanShell前置和后置处理器&#xff0c;分别是 BeanShell Pre…

NoSQL之Redis配置与优化(一)

关系数据库与非关系型数据库 &#xff1a; ●关系型数据库&#xff1a; 关系型数据库是一个结构化的数据库&#xff0c;创建在关系模型&#xff08;二维表格模型&#xff09;基础上&#xff0c;一般面向于记录。 SQL 语句&#xff08;标准数据查询语言&#xff09;就是一种基于…

FP7122 具有平均模式恒定电流控制的LED驱动器芯片

FP7122 具有平均模式恒定电流控制的LED驱动器芯片 一般说明 FP7122是在恒定关闭时间模式下工作的平均电流模式控制LED驱动器IC。FP7122不产生峰值到平均的误差&#xff0c;因此大大提高了LED电流的精度、线路和负载调节&#xff0c;而不需要任何回路补偿或高侧电流传感。输出的…

VB:二分法查找

VB&#xff1a;二分法查找 二分查找算法 Private Sub Command1_Click()Dim i%, m%, n%Dim x(1 To 10) As SingleFor i 1 To 10x(i) Val(InputBox("请输入"))Next iCall bubbleSort(x)For i LBound(x) To UBound(x) LBound(x)和UBound(x)是用于获取数组x的下界和上…

运营技巧|如何在不同的平台上高效批量管理账户?

在当今全球化时代&#xff0c;中国出海企业和B2B外贸企业越来越重视海外社媒营销&#xff0c;这已成为企业抢占市场份额的关键。并且&#xff0c;为了获取到更多流量&#xff0c;跨境人们还会开通Facebook、Twitter、Google、TikTok、Instagram等平台账号&#xff0c;搭建自己的…

在SpringBoot项目加入冲突动态监测算法

一、什么是冲突动态监测算法 冲突动态监测算法是一种网络通信中的冲突检测方法&#xff0c;适用于无线网络或其他共享传输介质的环境。该算法通过监测网络中的冲突状况&#xff0c;以避免数据冲突导致的网络拥塞和性能下降等问题。 具体来说&#xff0c;冲突动态监测算法利用…

教你制作作业查询系统

嗨&#xff0c;各位老师们&#xff0c;今天我要给你们介绍一个超级方便的工具——易查分&#xff01;你知道吗&#xff0c;利用易查分&#xff0c;我们可以轻松制作一个便捷高效的作业查询系统哦&#xff01; 是不是想有个自己的分班or成绩查询页面&#xff1f;博主给老师们争取…

Go Tip02 指针类型 、值类型和引用类型 、标识符的命名规范

文章目录 一、指针类型二、值类型和引用类型三、标识符的命名规范 一、指针类型 package mainimport "fmt"func main() {saylocation()}func saylocation() {// 指针类型// 基本数据类型&#xff0c;变量存的是值// 用&获取变量的地址// 基本数据类型在内存的布…