Python并发——concurrent.futures梳理

Python并发——concurrent.futures梳理

参考官方文档: concurrent.futures — 启动并行任务

Executor对象

class concurrent.funtures.Executor

该抽象类是 ThreadPoolExecutorProcessPoolExecutor 的父类,提供异步执行调用方法。要通过它的子类调用,而不是直接调用。

submit方法

submit(fn, /, *args, **kwargs)

提交一个可执行对象,fn 将按照如下方式被调用:fn(*args. **kwargs),并返回一个 Future 对象(下面会详细说)表示执行的结果,

例:

with ThreadPoolExecutor(max_workers=1) as executor:future = executor.submit(pow, 323, 1235)print(future.result())

map方法

map(func, *iterables, timeout=None, chunksize=1)
  • 类似于 map(func, *iterables) 函数,但是以下两点不同:

    • iterables 是立即执行而不是延迟执行的;
    • func 是异步执行的,对 func 的多个调用可以并发执行。
  • 如果从原始调用到 Executor.map() 经过 timeout 秒后, __next__() 已被调用且返回的结果还不可用,那么已返回的迭代器将触发 concurrent.futures.TimeoutErrortimeout 可以是整数或浮点数。如果 timeout 没有指定或为 None ,则没有超时限制。

  • 使用 ProcessPoolExecutor 时,这个方法会将 iterables 分割任务块并作为独立的任务并提交到执行池中。这些块的大概数量可以由 chunksize 指定正整数设置。 对很长的迭代器来说,使用大的 chunksize 值比默认值 1 能显著地提高性能。 chunksizeThreadPoolExecutor 没有效果。

  • 如果 func 调用引发一个异常,当从迭代器中取回它的值时这个异常将被引发。

  • iterables 是一个可迭代对象,其中每个元素是传递给 func 的参数列表,与 submit 方法直接传参不同的是,如果需要传递给 func 函数多个列表,通常需要再加一个中间层来进行列表参数的解析。python 线程池map()方法传递多参数list

shutdown方法

shutdown(wait=True, *, cancel_futures=False)

当待执行的 future 对象完成执行后向执行者发送信号,它就会释放正在使用的任何资源。 在关闭后调用 Executor.submit()Executor.map() 将会引发 RuntimeError

  • 如果 waitTrue 则此方法只有在所有待执行的 future 对象完成执行且释放已分配的资源后才会返回;

  • 如果 waitFalse,方法立即返回,所有待执行的 future 对象完成执行后会释放已分配的资源。 不管 wait 的值是什么,整个 Python 程序将等到所有待执行的 future 对象完成执行后才退出;

  • 如果 cancel_futuresTrue,此方法将取消所有执行器还未开始运行的挂起的 future。 任何已完成或正在运行的 future 将不会被取消,无论 cancel_futures 的值是什么;

  • 如果 cancel_futureswait 均为 True,则执行器已开始运行的所有 Future 将在此方法返回之前完成。 其余的 Future 会被取消。

如果使用 with 语句,你就可以避免显式调用这个方法,它将会停止 Executor (就相当于 Executor.shutdown() 调用时 wait 设为 True 一样等待):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:e.submit(shutil.copy, 'src1.txt', 'dest1.txt')e.submit(shutil.copy, 'src2.txt', 'dest2.txt')e.submit(shutil.copy, 'src3.txt', 'dest3.txt')e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

ThreadPoolExecutor

ThreadPoolExecutorExecutor 的子类,它使用线程池来异步执行调用。

class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
  • 使用最多 max_workers 个线程的线程池来异步执行调用。

  • (Python3.7之后)initializer 是在每个 worker 线程开始处调用的一个可选可调用对象。 initargs 是传递给 initializer 的元组参数。任何向池提交更多工作的尝试, initializer 都将引发一个异常,当前所有等待的工作都会引发一个 BrokenThreadPool

  • (Python3.5之后)如果 max_workersNone 或没有指定,将默认为机器处理器的个数,假如ThreadPoolExecutor 则重于I/O操作而不是CPU运算,那么可以乘以 5 ,同时工作线程的数量可以比 ProcessPoolExecutor 的数量高。

  • (Python3.6之后)thread_name_prefix 参数允许用户控制由线程池创建的 threading.Thread 工作线程名称以方便调试。

  • (Python3.8之后)max_workers 的默认值已改为 min(32, os.cpu_count() + 4)。 这个默认值会保留至少 5 个工作线程用于 I/O 密集型任务。 对于那些释放了 GIL 的 CPU 密集型任务,它最多会使用 32 个 CPU 核心。这样能够避免在多核机器上不知不觉地使用大量资源。现在 ThreadPoolExecutor 在启动 max_workers 个工作线程之前也会重用空闲的工作线程。

  • 当回调已关联了一个 Future 然后再等待另一个Future 的结果时就会发产死锁情况。例如:

    # 例一
    import time
    def wait_on_b():time.sleep(5)print(b.result())  # b will never complete because it is waiting on a.return 5def wait_on_a():time.sleep(5)print(a.result())  # a will never complete because it is waiting on b.return 6executor = ThreadPoolExecutor(max_workers=2)
    a = executor.submit(wait_on_b)
    b = executor.submit(wait_on_a)# 例二
    def wait_on_future():f = executor.submit(pow, 5, 2)# This will never complete because there is only one worker thread and# it is executing this function.print(f.result())executor = ThreadPoolExecutor(max_workers=1)
    executor.submit(wait_on_future)
    

ThreadPoolExecutor 实例:

import concurrent.futures
import urllib.requestURLS = ['http://www.foxnews.com/','http://www.cnn.com/','http://europe.wsj.com/','http://www.bbc.co.uk/','http://some-made-up-domain.com/']# Retrieve a single page and report the URL and contents
def load_url(url, timeout):with urllib.request.urlopen(url, timeout=timeout) as conn:return conn.read()# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:# Start the load operations and mark each future with its URLfuture_to_url = {executor.submit(load_url, url, 60): url for url in URLS}for future in concurrent.futures.as_completed(future_to_url):url = future_to_url[future]try:data = future.result()except Exception as exc:print('%r generated an exception: %s' % (url, exc))else:print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

ProcessPoolExecutor 类是 Executor 的子类,它使用进程池来异步地执行调用。

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
  • ProcessPoolExecutor 会使用 multiprocessing 模块,这允许它绕过 GIL,但也意味着只可以处理和返回可封存的对象。

  • __main__ 模块必须可以被工作者子进程导入。这意味着 ProcessPoolExecutor不可以工作在交互式解释器中。

  • 从可调用对象中调用 ExecutorFuture 的方法提交给 ProcessPoolExecutor 会导致死锁。这与 ThreadPoolExecutor 类似。

  • (Python3.7之后)异步地执行调用的 Executor 子类使用最多具有 max_workers 个进程的进程池。 如果 max_workersNone 或未给出,它将默认为机器的处理器个数。 如果 max_workers 小于等于 0,则将引发 ValueError。 在 Windows 上,max_workers 必须小于等于 61,否则将引发 ValueError。 如果 max_workersNone,则所选择的默认值最多为 61,即使存在更多的处理器。 mp_context 可以是一个多进程上下文或是 None。 它将被用来启动工作进程。 如果 mp_contextNone 或未给出,则将使用默认的多进程上下文。

  • (Python3.7之后)initializer 是一个可选的可调用对象,它会在每个工作进程启动时被调用;initargs 是传给 initializer 的参数元组。 如果 initializer 引发了异常,则所有当前在等待的任务以及任何向进程池提交更多任务的尝试都将引发 BrokenProcessPool

  • (Python3.3之后)如果其中一个工作进程被突然终止,BrokenProcessPool 就会马上触发。 可预计的行为没有定义,但执行器上的操作或它的 future 对象会被冻结或死锁。

可以看到,大部分参数含义与 ThreadPoolExecutor 是一致的。

ProcessPoolExecutor 实例:

import concurrent.futures
import mathPRIMES = [112272535095293,112582705942171,112272535095293,115280095190773,115797848077099,1099726899285419]def is_prime(n):if n < 2:return Falseif n == 2:return Trueif n % 2 == 0:return Falsesqrt_n = int(math.floor(math.sqrt(n)))for i in range(3, sqrt_n + 1, 2):if n % i == 0:return Falsereturn Truedef main():with concurrent.futures.ProcessPoolExecutor() as executor:for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):print('%d is prime: %s' % (number, prime))if __name__ == '__main__':main()

Future对象

Future 类将可调用对象封装为异步执行。Future 实例由 Executor.submit() 创建。将可调用对象封装为异步执行。Future 实例由 Executor.submit()创建,除非测试,不应手动直接创建。

class concurrent.futures.Future
  • cancel()

    尝试取消调用。 如果调用正在执行或已结束运行不能被取消则该方法将返回 False,否则调用会被取消并且该方法将返回 True

  • cancelled()

    如果调用成功取消返回 True

  • running()

    如果调用正在执行而且不能被取消那么返回 True

  • done()

    如果调用已被取消或正常结束那么返回 True

  • result(timeout=None)

    返回调用返回的值。如果调用还没完成那么这个方法将等待 timeout 秒。如果在 timeout 秒内没有执行完成,concurrent.futures.TimeoutError 将会被触发。timeout 可以是整数或浮点数。如果 timeout 没有指定或为 None,那么等待时间就没有限制。如果 futrue 在完成前被取消则 CancelledError 将被触发。如果调用引发了一个异常,这个方法也会引发同样的异常。

  • exception(timeout=None)

    返回由调用引发的异常。如果调用还没完成那么这个方法将等待 timeout 秒。如果在 timeout 秒内没有执行完成,concurrent.futures.TimeoutError 将会被触发。timeout 可以是整数或浮点数。如果 timeout 没有指定或为 None,那么等待时间就没有限制。如果 futrue 在完成前被取消则 CancelledError 将被触发。如果调用正常完成那么返回 None

  • add_done_callback(fn)

    附加可调用 fnfuture 对象。当 future 对象被取消或完成运行时,将会调用 fn,而这个 future 对象将作为它唯一的参数。加入的可调用对象总被属于添加它们的进程中的线程按加入的顺序调用。如果可调用对象引发一个 Exception 子类,它会被记录下来并被忽略掉。如果可调用对象引发一个 BaseException 子类,这个行为没有定义。如果 future 对象已经完成或已取消,fn 会被立即调用。

下面这些 Future 方法用于单元测试和 Executor 实现,一般不会用到,就不详细介绍了。

  • set_running_or_notify_cancel()

  • set_result(result)

  • set_exception(exception)

concurrent.futures模块的函数

  • wait

    concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
    

    等待 fs 所指定的 Future 实例(可能有不同的 Exectutor 实例创建)完成。fs 若给出重复的 futures 将被移除,只返回一次。

    • 返回值是一个 set 组成的命名二元组,其中第一个 set 叫做 done,包含完成(包括 finished 和 cancelled)的 futures,第二个 set 叫做 not_done,包含未完成(包括 pending 和 running)的 futures。

    • timeout 可以用来控制返回前最大的等待秒数。 timeout 可以为 int 或 float 类型。 如果 timeout 未指定或为 None ,则不限制等待时间。

    • return_when 指定此函数应在何时返回。它必须为以下常数之一:

    常量描述
    FIRST_COMPLETED函数将在任意可等待对象结束或取消时返回。
    FIRST_EXCEPTION函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED
    ALL_COMPLETED函数将在所有可等待对象结束或取消时返回。
  • as_complete

    concurrent.futures.as_completed(fs, timeout=None)
    

    返回一个包含 fs 所指定的 Future 实例(可能由不同的 Executor实例创建)的迭代器,这些实例会在完成时生成 future 对象(包括正常结束或被取消的 future 对象)。 任何由 fs 所指定的重复 future 对象将只被返回一次。 任何在 as_completed()被调用之前完成的 future 对象将优先被生成。 如果 __next__() 被调用并且在对 as_completed()的原始调用 timeout 秒之后结果仍不可用,则返回的迭代器将引发 concurrent.futures.TimeoutErrortimeout 可以为整数或浮点数。 如果 timeout 未指定或为 None,则不限制等待时间。

Exception 类

  • concurrent.futures.CancelledError

    future 对象被取消时会触发。

  • concurrent.futures.TimeoutError

future 对象执行超出给定的超时数值时引发。

  • concurrent.futures.BrokenExecutor

    当执行器被某些原因中断而且不能用来提交或执行新任务时就会被引发派生于 RuntimeError 的异常类。3.7 新版功能.

  • concurrent.futures.InvalidStateError

    当某个操作在一个当前状态所不允许的 future 上执行时将被引发。3.8 新版功能.

  • concurrent.futures.thread.BrokenThreadPool

    ThreadPoolExecutor 中的其中一个工作者初始化失败时会引发派生于 BrokenExecutor的异常类。3.7 新版功能.

  • concurrent.futures.process.BrokenProcessPool

ThreadPoolExecutor 中的其中一个 worker 不完整终止时(比如,被外部杀死)会引发派生于 BrokenExecutor( 原名 RuntimeError ) 的异常类。3.3 新版功能.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/532483.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TensorRT ONNX 基础

TensorRT ONNX 基础 tensorRT从零起步迈向高性能工业级部署&#xff08;就业导向&#xff09; 课程笔记&#xff0c;讲师讲的不错&#xff0c;可以去看原视频支持下。 概述 TensorRT 的核心在于对模型算子的优化&#xff08;合并算子、利用当前 GPU 特性选择特定的核函数等多种…

回文子串、回文子序列相关题目

回文子串、回文子序列相关题目 回文子串是要连续的&#xff0c;回文子序列可不是连续的。 516. 最长回文子序列 dp数组含义&#xff1a;dp[i][j]dp[i][j]dp[i][j] 表示子序列 s[i,j]s[i,j]s[i,j] 中的最长回文子序列的长度。 dp数组初始化&#xff1a;子序列长度为 1 时&am…

mmdetection tools工具梳理

mmdetection tools工具梳理 mmdetection 是一个非常好用的开源目标检测框架&#xff0c;我们可以用它方便地训练自己的目标检测模型&#xff0c;mmdetection 项目仓库提供许多实用的工具来实现帮助我们进行各种测试。本篇将梳理以下 mmdetection 项目仓库 tools 目录下的各种实…

TensorRT ONNX 基础(续)

TensorRT ONNX 基础&#xff08;续&#xff09; PyTorch正确导出ONNX 几条推荐的原则&#xff0c;可以减少潜在的错误&#xff1a; 对于任何使用到 shape、size 返回值的参数时&#xff0c;例如 tensor.view(tensor.size(0), -1) 这类操作&#xff0c;避免直接使用 tensor.s…

frp实现内网穿透极简教程

frp实现内网穿透极简教程 本文是内网穿透极简教程&#xff0c;为求简洁&#xff0c;我们不介绍为什么内网穿透也不介绍其原理&#xff0c;这里假设各位读者都已经明确的知道自己的目的&#xff0c;本文仅介绍如何安装配置 frp 实现内网穿透。 简单来说&#xff0c;内网穿透就…

图像预处理之warpaffine与双线性插值及其高性能实现

图像预处理之warpaffine与双线性插值及其高性能实现 视频讲解&#xff1a;https://www.bilibili.com/video/BV1ZU4y1A7EG 代码Repo&#xff1a;https://github.com/shouxieai/tensorRT_Pro 本文为视频讲解的个人笔记。 warpaffine矩阵变换 对于坐标点的变换&#xff0c;我们通…

LeetCode-10 正则表达式匹配

LeetCode-10 正则表达式匹配 动态规划 10. 正则表达式匹配 dp数组含义&#xff1a;dp[i][j]dp[i][j]dp[i][j] 表示 s[0:i−1]s[0:i-1]s[0:i−1] 能否被 p[0:j−1]p[0:j-1]p[0:j−1] 成功匹配。 状态转移方程 &#xff1a; 如果 s[i−1]p[j−1]s[i-1]p[j-1]s[i−1]p[j−1] …

shell if判断和for循环常见写法

shell if判断和for循环常见写法 转自&#xff1a; Shell中for循环的几个常用写法 Shell中if 条件判断总结 if常见写法 一、if的基本语法: if [ command ];then符合该条件执行的语句 elif [ command ];then符合该条件执行的语句 else符合该条件执行的语句 fibash shell会按顺序…

关于pytorch使用多个dataloader并使用zip和cycle来进行循环时出现的显存泄漏的问题

关于pytorch使用多个dataloader并使用zip和cycle来进行循环时出现的显存泄漏的问题 如果我们想要在 Pytorch 中同时迭代两个 dataloader 来处理数据&#xff0c;会有两种情况&#xff1a;一是我们按照较短的 dataloader 来迭代&#xff0c;长的 dataloader 超过的部分就丢弃掉…

neovim及coc.nvim自动补全初探

neovim及coc.nvim自动补全初探 安装 # mac # 安装 brew install neovim # 查看neovim安装路径 brew list nvim# ubuntu apt install neovim习惯了打开 vi/vim 的方式&#xff0c;可以用个 alias 在 ~/.zshrc 中设置一下&#xff1a; alias vi"nvim"插件 vim-plug…

sed 简明教程

sed 简明教程 转自&#xff1a;https://coolshell.cn/articles/9104.html awk于1977年出生&#xff0c;今年36岁本命年&#xff0c;sed比awk大2-3岁&#xff0c;awk就像林妹妹&#xff0c;sed就是宝玉哥哥了。所以 林妹妹跳了个Topless&#xff0c;他的哥哥sed坐不住了&#xf…

awk 简明教程

awk 简明教程 转自&#xff1a;https://coolshell.cn/articles/9070.html 有一些网友看了前两天的《Linux下应该知道的技巧》希望我能教教他们用awk和sed&#xff0c;所以&#xff0c;出现了这篇文章。我估计这些80后的年轻朋友可能对awk/sed这类上古神器有点陌生了&#xff0c…

应该知道的LINUX技巧

应该知道的LINUX技巧 转自&#xff1a;https://coolshell.cn/articles/8883.html 这篇文章来源于Quroa的一个问答《What are some time-saving tips that every Linux user should know?》—— Linux用户有哪些应该知道的提高效率的技巧。我觉得挺好的&#xff0c;总结得比较好…

[深度][PyTorch] DDP系列第一篇:入门教程

[深度][PyTorch] DDP系列第一篇&#xff1a;入门教程 转自&#xff1a;[原创][深度][PyTorch] DDP系列第一篇&#xff1a;入门教程 概览 想要让你的PyTorch神经网络在多卡环境上跑得又快又好&#xff1f;那你definitely需要这一篇&#xff01; No one knows DDP better than I…

[深度][PyTorch] DDP系列第二篇:实现原理与源代码解析

[深度][PyTorch] DDP系列第二篇&#xff1a;实现原理与源代码解析 转自&#xff1a;https://zhuanlan.zhihu.com/p/187610959 概览 想要让你的PyTorch神经网络在多卡环境上跑得又快又好&#xff1f;那你definitely需要这一篇&#xff01; No one knows DDP better than I do! …

[深度][PyTorch] DDP系列第三篇:实战与技巧

[深度][PyTorch] DDP系列第三篇&#xff1a;实战与技巧 转自&#xff1a;https://zhuanlan.zhihu.com/p/250471767 零. 概览 想要让你的PyTorch神经网络在多卡环境上跑得又快又好&#xff1f;那你definitely需要这一篇&#xff01; No one knows DDP better than I do! – – …

PIL、OpenCV中resize算子实现不同的问题

PIL、OpenCV中resize算子实现不同的问题 测试图像&#xff1a;https://raw.githubusercontent.com/TropComplique/ssd-pytorch/master/images/dogs-and-cats.jpg &#xff08;直接 wget 可获得&#xff09; 测试版本&#xff1a; opencv-python 4.4.0.46Pillow 8.0.1 测试代…

mac X11 XQuartz的安装与使用

mac X11 XQuartz的安装与使用 本地系统&#xff1a;MacOS 12.4 远程主机系统&#xff1a;Ubuntu 18.04 命令说明 ssh命令 ssh 命令大家很熟悉了&#xff0c;这里仅介绍与 X11 forwarding 相关的几个选项。 本部分译自 ssh 命令手册&#xff0c;可见 man ssh -X &#xf…

机器学习:系统设计与实现 分布式训练

机器学习系统:设计与实现 分布式训练 转自&#xff1a;https://openmlsys.github.io/chapter_distributed_training/index.html 随着机器学习的进一步发展&#xff0c;科学家们设计出更大型&#xff0c;更多功能的机器学习模型&#xff08;例如说&#xff0c;GPT-3&#xff09;…

Linux命令行及各常用工具代理设置

Linux命令行及各常用工具代理设置 命令行代理设置 1 通过命令行指定 直接为当前命令行设置代理 对当前终端的全部工具&#xff08;apt、curl、wget、git 等全都有效&#xff09;以下仅以 http 代理为例&#xff0c;如果是其他协议&#xff08;如 socks 等&#xff09;自行改…