python 使用 asyncio 包处理并发

文章目录

    • 1. 线程与协程对比
    • 2. 使用 asyncio 和 aiohttp 下载
    • 3. 避免阻塞型调用
    • 4. 使用 asyncio.as_completed
    • 5. 使用Executor对象,防止阻塞事件循环
    • 6. 从回调到期物和协程

learn from 《流畅的python》

1. 线程与协程对比

threading

import threading
import itertools
import time
import sysclass Signal:go = Truedef spin(msg, signal):write, flush = sys.stdout.write, sys.stdout.flushfor char in itertools.cycle("|/-\\"):  # 无限循环status = char + ' ' + msgwrite(status)flush()write("\x08" * len(status))  # \x08 退格键,光标移动回去time.sleep(0.1)if not signal.go:breakwrite(' ' * len(status) + "\x08" * len(status))# 使用空格清除状态消息,把光标移回开头def slow_function():  # 假设是一个耗时的计算过程time.sleep(10)  # sleep 会阻塞主线程,释放GIL,创建从属线程return 42def supervisor():  # 该函数,设置从属线程,显示线程对象,运行耗时的计算,最后杀死线程signal = Signal()spinner = threading.Thread(target=spin, args=("thinking!", signal))print("spinner object:", spinner)  # 显示从属线程对象spinner.start()  # 启动从属线程result = slow_function()  # 运行计算程序,阻塞主线程,从属线程动画显示旋转指针signal.go = False  # 改变signal 状态,终止 spin 中的for循环spinner.join()  # 等待spinner线程结束return resultdef main():result = supervisor()  # 运行 supervisorprint("Answer:", result)if __name__ == '__main__':main()

适合 asyncio 的协程要由调用方驱动,并由调用方通过 yield from 调用(语法过时了,新版的用 async / await )
或者把协程传给 asyncio 包中的某个函数

一篇博文参考:https://www.cnblogs.com/dhcn/p/9032461.html

import asyncio
import itertools
import sys# https://docs.python.org/3.8/library/asyncio.html
async def spin(msg): # py3.5以后的新语法 async / await,协程函数write, flush = sys.stdout.write, sys.stdout.flushfor char in itertools.cycle("|/-\\"):  # 无限循环status = char + ' ' + msgwrite(status)flush()write("\x08" * len(status))  # \x08 退格键,光标移动回去try:await asyncio.sleep(0.1)except asyncio.CancelledError: # 遇到取消异常,退出循环print("cancel")breakwrite(' ' * len(status) + "\x08" * len(status))print("end spin")async def slow_function(): # 协程函数print("start IO")await asyncio.sleep(3) # 假装进行 IO 操作print("end IO  ")return 42async def supervisor():  # 协程函数spinner = asyncio.ensure_future(spin("thinking!")) # spinner 排定任务print("spinner object:", spinner)  # 显示从属线程对象# spinner object: <Task pending coro=<spin() running at D:\ >print("start slow")result = await slow_function()print("end slow")spinner.cancel() # task对象可以取消,抛出CancelledError异常return resultdef main():loop = asyncio.get_event_loop() # 获取事件循环的引用result = loop.run_until_complete(supervisor())  # 驱动 supervisor 协程,让它运行完毕loop.close()print("answer:", result)if __name__ == '__main__':main()

输出:

spinner object: <Task pending coro=<spin() running at D:\gitcode >
start slow
start IO
end IO  ng!(期间thinking!在输出,后来被覆盖)
end slow
cancel
end spin
answer: 42请按任意键继续. . .

2. 使用 asyncio 和 aiohttp 下载

import time
import sys
import os
import asyncio
import aiohttpPOP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = './'def save_flag(img, filename):  # 保存图像path = os.path.join(DEST_DIR, filename)with open(path, 'wb') as fp:fp.write(img)def show(text):  # 打印信息print(text, end=' ')sys.stdout.flush()async def get_flag(cc):  # 获取图像url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())async with aiohttp.request("GET", url) as resp:image = await resp.read()return imageasync def download_one(cc):image = await get_flag(cc)show(cc)save_flag(image, cc.lower() + '.gif')return ccdef download_many_(cc_list):loop = asyncio.get_event_loop()todo = [download_one(cc) for cc in sorted(cc_list)] # 协程对象wait_coro = asyncio.wait(todo) # 包装成 task,wait是协程函数,返回协程或者生成器对象res, _ = loop.run_until_complete(wait_coro)# 驱动协程,返回 第一个元素是一系列结束的期物,第二个元素是一系列未结束的期物# loop.close(),好像不需要这句 上面 with 处可能自动关闭了return len(res)def main(download_many):t0 = time.time()count = download_many(POP20_CC)elapsed = time.time() - t0msg = '\n{} flags downloaded in {:.2f}s'print(msg.format(count, elapsed))  # 计时信息if __name__ == '__main__':main(download_many_)# US RU ID ET BR FR CN PH BD NG DE JP EG TR MX IN PK IR CD VN 
# 20 flags downloaded in 3.88s

3. 避免阻塞型调用

执行硬盘网络 I/O 操作的函数定义为 阻塞型函数
在这里插入图片描述
有两种方法能 避免阻塞型调用 中止整个应用程序 的进程:

  • 单独的线程中运行各个阻塞型操作
  • 把每个阻塞型操作 转换成非阻塞的异步调用 使用

4. 使用 asyncio.as_completed

import collections
import time
import sys
import os
import asyncio
from http import HTTPStatusimport aiohttp
from aiohttp import web
import tqdmPOP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = './'
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000class FetchError(Exception):def __init__(self, country_code):self.country_code = country_codedef save_flag(img, filename):  # 保存图像path = os.path.join(DEST_DIR, filename)with open(path, 'wb') as fp:fp.write(img)def show(text):  # 打印信息print(text, end=' ')sys.stdout.flush()async def get_flag(cc):  # 获取图像url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())async with aiohttp.request("GET", url) as resp:if resp.status == 200:image = await resp.read()return imageelif resp.status == 404:raise web.HTTPNotFound()else:raise aiohttp.WebSocketError(code=resp.status, message=resp.reason)async def download_one(cc, semaphore, verbose):try:async with semaphore:image = await get_flag(cc)except web.HTTPNotFound:status = HTTPStatus.NOT_FOUNDmsg = "not found"except Exception as exc:raise FetchError(cc) from excelse:save_flag(image, cc.lower() + '.gif')status = HTTPStatus.OKmsg = "OK"if verbose and msg:print(cc, msg)return (status, cc)async def downloader_coro(cc_list, verbose, concur_req):  # 协程函数counter = collections.Counter()semaphore = asyncio.Semaphore(value=concur_req)  # 最多可以使用这个计数器的协程个数todo = [download_one(cc, semaphore, verbose=True) for cc in sorted(cc_list)]  # 协程对象列表todo_iter = asyncio.as_completed(todo)  # 获取迭代器,会在期物运行结束后返回期物if not verbose:todo_iter = tqdm.tqdm(todo_iter, total=len(cc_list))  # 迭代器传给tqdm,显示进度条for future in todo_iter:  # 迭代器运行结束的期物try:res = await future  # 获取期物对象的结果except FetchError as exc:country_code = exc.country_codetry:error_msg = exc.__cause__.args[0]except IndexError:error_msg = exc.__cause__.__class__.__name__if verbose and error_msg:msg = '*** Error for {}: {}'print(msg.format(country_code, error_msg))status = HTTPStatus.errorelse:status = res[0]counter[status] += 1  # 记录结果return counter  # 返回计数器def download_many_(cc_list, verbose, concur_req):loop = asyncio.get_event_loop()coro = downloader_coro(cc_list, verbose=verbose, concur_req=concur_req)# 实例化 downloader_coro协程,然后通过 run_until_complete 方法把它传给事件循环counts = loop.run_until_complete(coro)# loop.close() # 好像不需要这句 上面 with 处可能自动关闭了return countsdef main(download_many):t0 = time.time()count = download_many(POP20_CC, True, MAX_CONCUR_REQ)elapsed = time.time() - t0msg = '\n{} flags downloaded in {:.2f}s'print(msg.format(count, elapsed))  # 计时信息if __name__ == '__main__':main(download_many_)

5. 使用Executor对象,防止阻塞事件循环

  • loop.run_in_executor 方法把阻塞的作业(例如保存文件)委托给线程池做
async def download_one(cc, semaphore, verbose):try:async with semaphore:image = await get_flag(cc)except web.HTTPNotFound:status = HTTPStatus.NOT_FOUNDmsg = "not found"except Exception as exc:raise FetchError(cc) from excelse:# 因此保存文件时,整个应用程序都会冻结,为了避免,使用下面方法loop = asyncio.get_event_loop()  # 获取事件循环对象的引用loop.run_in_executor(None,  # 方法的第一个参数是 Executor 实例;# 如果设为 None,使用事件循环的默认 ThreadPoolExecutor 实例save_flag, image, cc.lower() + ".gif")#  余下的参数是可调用的对象,以及可调用对象的位置参数status = HTTPStatus.OKmsg = "OK"if verbose and msg:print(cc, msg)return (status, cc)

6. 从回调到期物和协程

  • 如果一个操作需要依赖之前操作的结果,那就得嵌套回调
def stage1(response1):request2 = step1(response1)api_call2(request2, stage2)def stage2(response2):request3 = step2(response2)api_call3(request3, stage3)def stage3(response3):tep3(response3)api_call1(request1, stage1)

好的写法:

async def three_stages(request1): response1 = await api_call1(request1) # 第一步 request2 = step1(response1) response2 = await  api_call2(request2) # 第二步 request3 = step2(response2) response3 = await  api_call3(request3)# 第三步 step3(response3) 
loop.create_task(three_stages(request1)) # 必须显式调度执行

协程 必须使用 事件循环 显式排定 协程的执行时间

异步系统避免用户级线程的开销,这是它能比多线程系统管理更多并发连接的主要原因

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/471715.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 1991. 找到数组的中间位置(前缀和)

文章目录1. 题目2. 解题1. 题目 给你一个下标从 0 开始的整数数组 nums &#xff0c;请你找到 最左边 的中间位置 middleIndex &#xff08;也就是所有可能中间位置下标最小的一个&#xff09;。 中间位置 middleIndex 是满足 nums[0] nums[1] ... nums[middleIndex-1] n…

LeetCode 1992. 找到所有的农场组(BFS)

文章目录1. 题目2. 解题1. 题目 给你一个下标从 0 开始&#xff0c;大小为 m x n 的二进制矩阵 land &#xff0c;其中 0 表示一单位的森林土地&#xff0c;1 表示一单位的农场土地。 为了让农场保持有序&#xff0c;农场土地之间以矩形的 农场组 的形式存在。 每一个农场组都…

VS2015开发Android,自带模拟器无法调试、加载程序,算是坑吗

VS2015出来后&#xff0c;确定变化很大&#xff0c;什么android、ios的&#xff0c;不在话下。对于我这样传统型的人&#xff0c;也第一时间试用了一下&#xff08;vs2003->vs2008->vs2012->vs2015&#xff09;。以前用eclipse开发过android小程序&#xff0c;现在想试…

amd cpu 安卓模拟器_夜神模拟器常见问题解答_v20201025

MAC版本常见问题Mac版本模拟器常见问题MAC模拟器出现“您应该将它移到废纸篓”解决办法MAC版本模拟器功能介绍MAC版本模拟器界面Mac版模拟器可能无法启动(卡99%)的原因及解决方式其他问题如何在夜神模拟器中安装自己想要的游戏/应用Sample CA 2证书没网解决办法GlobalSignature…

LeetCode 502. IPO(优先队列)

文章目录1. 题目2. 解题1. 题目 假设 力扣&#xff08;LeetCode&#xff09;即将开始 IPO 。 为了以更高的价格将股票卖给风险投资公司&#xff0c;力扣 希望在 IPO 之前开展一些项目以增加其资本。 由于资源有限&#xff0c;它只能在 IPO 之前完成最多 k 个不同的项目。 帮助…

bash 不是内部或外部命令_Win10_cmd下提示:‘xxx’不是内部或外部命令,也不是可运行的程序 或批处理文件...

一、windows下命令行查看ip: win+R,调出【运行】,输入cmd,回车打开终端,输入ipconfig,查看成功: 二、可能出现的问题: ping 不是内部或外部命令,也不是可运行的程序或批处理文件。 ipconfig 不是内部或外部命令,也不是可运行的程序或批处理文件。 解决: 1、控制面板…

python 属性描述符

文章目录1. 描述符示例&#xff1a;验证属性2. 自动获取储存属性的名称3. 继承改进4. 覆盖型与非覆盖型描述符对比4.1 覆盖型描述符4.2 没有 __get__ 方法的覆盖型描述符4.3 非覆盖型描述符4.4 在类中覆盖描述符5. 描述符用法建议learn from 《流畅的python》 1. 描述符示例&a…

计算机声音与视频教程,电脑怎么录屏幕视频带声音 电脑录屏幕视频带声音教程...

电脑怎么录屏幕视频带声音&#xff0c;相信有许多的用户在工作上或者学习上也会有这种需求&#xff0c;自己对此也还是不太的了解&#xff0c;毕竟也是没有怎么使用过&#xff0c;对此也是想要知道这电脑怎么录屏幕视频带声音&#xff0c;其实也是有几个原因&#xff0c;下面就…

tensor转换为图片_为大家介绍图片转换pdf的经验总结!你找对方法了吗?

图片转换pdf怎么做&#xff1f;不少朋友们在学习和工作中都发现了PDF这种文件格式似乎格外的吃香&#xff0c;你收到的很多培训文件和在网上搜罗的一些学习文件&#xff0c;全部都是PDF格式&#xff0c;PDF格式这么吃香&#xff1f;甚至有人让你把图片都转成PDF格式&#xff0c…

LeetCode 2021 力扣杯全国秋季编程大赛(第384名)

文章目录1. 无人机方阵2. 心算挑战3. 黑白翻转棋4. 玩具套圈5. 十字路口的交通2021.9.11&#xff0c;周六比赛之前&#xff1a;早上去交大看看&#xff0c;本科毕业10年了&#xff0c;由于限流&#xff0c;校园里没有多少回校的校友。逛了逛&#xff0c;跟太太和的她的同学一起…

java 画笔跟swing组件_Java学习教程(基础)--Java版本历史(二)

Java语言自JDK1.0版本以来经历了许多次更新&#xff0c;也在基本程序库中增加了大量的类别和包。从J2SE 1.4开始&#xff0c;Java语言的变动由 Java Community Process&#xff08;JCP&#xff09;管理&#xff0c;JCP使用Java规范请求&#xff08;Java Specification Requests…

计算机网络路由选择协议,IP路由选择协议原理和作用

IP路由选择协议原理和作用(2008-10-20 19:26:17)标签&#xff1a;杂谈IP路由选择如果目的主机与源主机直接相连或都在一个共享网络上,那就直接把包发送到目的主机,如果不是,那把ip数据报送到默认路由器,由它转发路由器使用路由表保存自己知道的网络的信息,它包括:目的IP地址,它…

LeetCode 2000. 反转单词前缀

文章目录1. 题目2. 解题1. 题目 给你一个下标从 0 开始的字符串 word 和一个字符 ch 。 找出 ch 第一次出现的下标 i &#xff0c;反转 word 中从下标 0 开始、直到下标 i 结束&#xff08;含下标 i &#xff09;的那段字符。 如果 word 中不存在字符 ch &#xff0c;则无需进…

oss图片跨域问题_图片存储解决方案-阿里云对象存储

开通对象存储OSS1. 打开阿里云官网&#xff0c;选择产品对象存储 OSS 2.开通对象存储OSS 需要支付宝扫码登录3.进入管理控制台 4.新建存储空间 5.跨域资源共享(CORS)的设置管理文件整合Springboot实现图片上传服务端签名直传并设置上传回调 : 在服务端完成签名&#xff0c;并且…

2015-8-10工作日志

1. 工作规划&#xff1a;完成系统请假管理的功能。 &#xff08;1&#xff09;根据系统需求完成请假管理model&#xff1b; &#xff08;2&#xff09;进行请假流程的deployment&#xff1b; &#xff08;3&#xff09;进行请假流程的流程定义管理&#xff1b; &#xff08;4&a…

LeetCode 2001. 可互换矩形的组数

文章目录1. 题目2. 解题1. 题目 用一个下标从 0 开始的二维整数数组 rectangles 来表示 n 个矩形&#xff0c;其中 rectangles[i] [widthi, heighti] 表示第 i 个矩形的宽度和高度。 如果两个矩形 i 和 j&#xff08;i < j&#xff09;的宽高比相同&#xff0c;则认为这两…

google 浏览器默认打开控制台_前端开发调试:浏览器console方法总结

今天突发奇想&#xff0c;准备总结下console的各个函数。以前都是只用一个console.log(),查了一下发现有好多&#xff0c;就记下来&#xff0c;方便以后查阅。速记consoleConsole对象提供浏览器控制台的接入&#xff0c;不同浏览器是不一样的&#xff0c;这里介绍普遍存在的Con…

江小白包装设计原型_雪碧和江小白的品牌跨界合作之旅可谓是一场品牌包装的视觉盛宴...

大家好&#xff0c;我是古小一&#xff0c;一个行走在酒水品牌包装设计不归路上的小编&#xff01;当下品牌间的跨界合作越来越多&#xff0c;消费者不但有审美疲劳的趋势&#xff0c;脑洞过大的跨界还容易引发群嘲。不过好在有热情网友的帮助&#xff0c;雪碧与江小白已经自然…

LeetCode 2002. 两个回文子序列长度的最大乘积(状态压缩+枚举状态子集+预处理)

文章目录1. 题目2. 解题2.1 超时2.2 预处理优化1. 题目 给你一个字符串 s &#xff0c;请你找到 s 中两个 不相交回文子序列 &#xff0c;使得它们长度的 乘积最大 。 两个子序列在原字符串中如果没有任何相同下标的字符&#xff0c;则它们是 不相交 的。 请你返回两个回文子…

[hdu5372 Segment Game]树状数组

题意&#xff1a;有两种操作&#xff1a;(1)插入线段&#xff0c;第i次插入的线段左边界为Li&#xff0c;长度为i (2)删除线段&#xff0c;删除第x次插入的线段。每次插入线段之前询问有多少条线段被它覆盖。 思路&#xff1a;由于插入的线段长度是递增的&#xff0c;所以第i次…