python 使用 asyncio 包处理并发

文章目录

    • 1. 线程与协程对比
    • 2. 使用 asyncio 和 aiohttp 下载
    • 3. 避免阻塞型调用
    • 4. 使用 asyncio.as_completed
    • 5. 使用Executor对象,防止阻塞事件循环
    • 6. 从回调到期物和协程

learn from 《流畅的python》

1. 线程与协程对比

threading

import threading
import itertools
import time
import sysclass Signal:go = Truedef spin(msg, signal):write, flush = sys.stdout.write, sys.stdout.flushfor char in itertools.cycle("|/-\\"):  # 无限循环status = char + ' ' + msgwrite(status)flush()write("\x08" * len(status))  # \x08 退格键,光标移动回去time.sleep(0.1)if not signal.go:breakwrite(' ' * len(status) + "\x08" * len(status))# 使用空格清除状态消息,把光标移回开头def slow_function():  # 假设是一个耗时的计算过程time.sleep(10)  # sleep 会阻塞主线程,释放GIL,创建从属线程return 42def supervisor():  # 该函数,设置从属线程,显示线程对象,运行耗时的计算,最后杀死线程signal = Signal()spinner = threading.Thread(target=spin, args=("thinking!", signal))print("spinner object:", spinner)  # 显示从属线程对象spinner.start()  # 启动从属线程result = slow_function()  # 运行计算程序,阻塞主线程,从属线程动画显示旋转指针signal.go = False  # 改变signal 状态,终止 spin 中的for循环spinner.join()  # 等待spinner线程结束return resultdef main():result = supervisor()  # 运行 supervisorprint("Answer:", result)if __name__ == '__main__':main()

适合 asyncio 的协程要由调用方驱动,并由调用方通过 yield from 调用(语法过时了,新版的用 async / await )
或者把协程传给 asyncio 包中的某个函数

一篇博文参考:https://www.cnblogs.com/dhcn/p/9032461.html

import asyncio
import itertools
import sys# https://docs.python.org/3.8/library/asyncio.html
async def spin(msg): # py3.5以后的新语法 async / await,协程函数write, flush = sys.stdout.write, sys.stdout.flushfor char in itertools.cycle("|/-\\"):  # 无限循环status = char + ' ' + msgwrite(status)flush()write("\x08" * len(status))  # \x08 退格键,光标移动回去try:await asyncio.sleep(0.1)except asyncio.CancelledError: # 遇到取消异常,退出循环print("cancel")breakwrite(' ' * len(status) + "\x08" * len(status))print("end spin")async def slow_function(): # 协程函数print("start IO")await asyncio.sleep(3) # 假装进行 IO 操作print("end IO  ")return 42async def supervisor():  # 协程函数spinner = asyncio.ensure_future(spin("thinking!")) # spinner 排定任务print("spinner object:", spinner)  # 显示从属线程对象# spinner object: <Task pending coro=<spin() running at D:\ >print("start slow")result = await slow_function()print("end slow")spinner.cancel() # task对象可以取消,抛出CancelledError异常return resultdef main():loop = asyncio.get_event_loop() # 获取事件循环的引用result = loop.run_until_complete(supervisor())  # 驱动 supervisor 协程,让它运行完毕loop.close()print("answer:", result)if __name__ == '__main__':main()

输出:

spinner object: <Task pending coro=<spin() running at D:\gitcode >
start slow
start IO
end IO  ng!(期间thinking!在输出,后来被覆盖)
end slow
cancel
end spin
answer: 42请按任意键继续. . .

2. 使用 asyncio 和 aiohttp 下载

import time
import sys
import os
import asyncio
import aiohttpPOP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = './'def save_flag(img, filename):  # 保存图像path = os.path.join(DEST_DIR, filename)with open(path, 'wb') as fp:fp.write(img)def show(text):  # 打印信息print(text, end=' ')sys.stdout.flush()async def get_flag(cc):  # 获取图像url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())async with aiohttp.request("GET", url) as resp:image = await resp.read()return imageasync def download_one(cc):image = await get_flag(cc)show(cc)save_flag(image, cc.lower() + '.gif')return ccdef download_many_(cc_list):loop = asyncio.get_event_loop()todo = [download_one(cc) for cc in sorted(cc_list)] # 协程对象wait_coro = asyncio.wait(todo) # 包装成 task,wait是协程函数,返回协程或者生成器对象res, _ = loop.run_until_complete(wait_coro)# 驱动协程,返回 第一个元素是一系列结束的期物,第二个元素是一系列未结束的期物# loop.close(),好像不需要这句 上面 with 处可能自动关闭了return len(res)def main(download_many):t0 = time.time()count = download_many(POP20_CC)elapsed = time.time() - t0msg = '\n{} flags downloaded in {:.2f}s'print(msg.format(count, elapsed))  # 计时信息if __name__ == '__main__':main(download_many_)# US RU ID ET BR FR CN PH BD NG DE JP EG TR MX IN PK IR CD VN 
# 20 flags downloaded in 3.88s

3. 避免阻塞型调用

执行硬盘网络 I/O 操作的函数定义为 阻塞型函数
在这里插入图片描述
有两种方法能 避免阻塞型调用 中止整个应用程序 的进程:

  • 单独的线程中运行各个阻塞型操作
  • 把每个阻塞型操作 转换成非阻塞的异步调用 使用

4. 使用 asyncio.as_completed

import collections
import time
import sys
import os
import asyncio
from http import HTTPStatusimport aiohttp
from aiohttp import web
import tqdmPOP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = './'
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000class FetchError(Exception):def __init__(self, country_code):self.country_code = country_codedef save_flag(img, filename):  # 保存图像path = os.path.join(DEST_DIR, filename)with open(path, 'wb') as fp:fp.write(img)def show(text):  # 打印信息print(text, end=' ')sys.stdout.flush()async def get_flag(cc):  # 获取图像url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())async with aiohttp.request("GET", url) as resp:if resp.status == 200:image = await resp.read()return imageelif resp.status == 404:raise web.HTTPNotFound()else:raise aiohttp.WebSocketError(code=resp.status, message=resp.reason)async def download_one(cc, semaphore, verbose):try:async with semaphore:image = await get_flag(cc)except web.HTTPNotFound:status = HTTPStatus.NOT_FOUNDmsg = "not found"except Exception as exc:raise FetchError(cc) from excelse:save_flag(image, cc.lower() + '.gif')status = HTTPStatus.OKmsg = "OK"if verbose and msg:print(cc, msg)return (status, cc)async def downloader_coro(cc_list, verbose, concur_req):  # 协程函数counter = collections.Counter()semaphore = asyncio.Semaphore(value=concur_req)  # 最多可以使用这个计数器的协程个数todo = [download_one(cc, semaphore, verbose=True) for cc in sorted(cc_list)]  # 协程对象列表todo_iter = asyncio.as_completed(todo)  # 获取迭代器,会在期物运行结束后返回期物if not verbose:todo_iter = tqdm.tqdm(todo_iter, total=len(cc_list))  # 迭代器传给tqdm,显示进度条for future in todo_iter:  # 迭代器运行结束的期物try:res = await future  # 获取期物对象的结果except FetchError as exc:country_code = exc.country_codetry:error_msg = exc.__cause__.args[0]except IndexError:error_msg = exc.__cause__.__class__.__name__if verbose and error_msg:msg = '*** Error for {}: {}'print(msg.format(country_code, error_msg))status = HTTPStatus.errorelse:status = res[0]counter[status] += 1  # 记录结果return counter  # 返回计数器def download_many_(cc_list, verbose, concur_req):loop = asyncio.get_event_loop()coro = downloader_coro(cc_list, verbose=verbose, concur_req=concur_req)# 实例化 downloader_coro协程,然后通过 run_until_complete 方法把它传给事件循环counts = loop.run_until_complete(coro)# loop.close() # 好像不需要这句 上面 with 处可能自动关闭了return countsdef main(download_many):t0 = time.time()count = download_many(POP20_CC, True, MAX_CONCUR_REQ)elapsed = time.time() - t0msg = '\n{} flags downloaded in {:.2f}s'print(msg.format(count, elapsed))  # 计时信息if __name__ == '__main__':main(download_many_)

5. 使用Executor对象,防止阻塞事件循环

  • loop.run_in_executor 方法把阻塞的作业(例如保存文件)委托给线程池做
async def download_one(cc, semaphore, verbose):try:async with semaphore:image = await get_flag(cc)except web.HTTPNotFound:status = HTTPStatus.NOT_FOUNDmsg = "not found"except Exception as exc:raise FetchError(cc) from excelse:# 因此保存文件时,整个应用程序都会冻结,为了避免,使用下面方法loop = asyncio.get_event_loop()  # 获取事件循环对象的引用loop.run_in_executor(None,  # 方法的第一个参数是 Executor 实例;# 如果设为 None,使用事件循环的默认 ThreadPoolExecutor 实例save_flag, image, cc.lower() + ".gif")#  余下的参数是可调用的对象,以及可调用对象的位置参数status = HTTPStatus.OKmsg = "OK"if verbose and msg:print(cc, msg)return (status, cc)

6. 从回调到期物和协程

  • 如果一个操作需要依赖之前操作的结果,那就得嵌套回调
def stage1(response1):request2 = step1(response1)api_call2(request2, stage2)def stage2(response2):request3 = step2(response2)api_call3(request3, stage3)def stage3(response3):tep3(response3)api_call1(request1, stage1)

好的写法:

async def three_stages(request1): response1 = await api_call1(request1) # 第一步 request2 = step1(response1) response2 = await  api_call2(request2) # 第二步 request3 = step2(response2) response3 = await  api_call3(request3)# 第三步 step3(response3) 
loop.create_task(three_stages(request1)) # 必须显式调度执行

协程 必须使用 事件循环 显式排定 协程的执行时间

异步系统避免用户级线程的开销,这是它能比多线程系统管理更多并发连接的主要原因

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/471715.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ai预测占比_2019-2021年中国AI芯片市场预测与展望数据

来源&#xff1a;雪球App&#xff0c;作者&#xff1a; IC咖啡&#xff0c;(https://xueqiu.com/1118999957/139121699)预计未来三年AI芯片市场规模仍将保持50%以上的增长速度&#xff0c;到2019年中国AI芯片市场规模将达到124.1亿元。从细分市场结构来看&#xff0c;云端训练芯…

计算机专业英语第2版郭涛翻译,计算机专业英语

计算机专业英语作  者&#xff1a;郭涛 主编出版时间&#xff1a;2007年01月定  价&#xff1a;21.60I S B N &#xff1a;9787560939209所属分类&#xff1a; 大中专教材 > 研究生/本科/专科教材 &nbsp大中专教材 &nbsp语言学习 > 大学英语 &nbsp语…

ODS与数据仓库

数据仓库是目前主要的数据存储体系。数据仓库之增W.H.Inmon认为&#xff0c;数据仓库是指支持管理决策过程的、面向主题的、集成的、随时间而变的、持久的数据的集合。简单地说&#xff0c;一个数据仓库就一个自数据库的商业应用系统&#xff0c;该数据库的数据来自于其它的运作…

LeetCode 1991. 找到数组的中间位置(前缀和)

文章目录1. 题目2. 解题1. 题目 给你一个下标从 0 开始的整数数组 nums &#xff0c;请你找到 最左边 的中间位置 middleIndex &#xff08;也就是所有可能中间位置下标最小的一个&#xff09;。 中间位置 middleIndex 是满足 nums[0] nums[1] ... nums[middleIndex-1] n…

mysq命令行导出sql_mysql 命令行导入导出 sql

命令行source 导入数据库&#xff1a;代码如下复制代码1&#xff0c;将要导入的.sql文件移至bin文件下&#xff0c;这样的路径比较方便2&#xff0c;同上面导出的第1步3&#xff0c;进入MySQL&#xff1a;mysql -u 用户名 -p如我输入的命令行:mysql -u root -p (输入同样后会…

计算机应用基础第六章,《计算机应用基础》第六章习题

周南岳版第六章习题及答案《计算机应用基础》教材第6章习题一、填空题(1)计算机网络中共享资源指的是硬件、软件和___数据___资源。(2)计算机网络中&#xff0c;通信双方必须共同遵守的规则或约定&#xff0c;称为__协议__。(3)在计算机局域网中&#xff0c;将计算机连接到网络…

discuz MVC结构分析

Discuz软件经解压后产生的三个文件夹中的一个叫upload的成为网站的根目录。里面的内容可以在某些网站上在线阅读&#xff0c;如用好库编程网。也可以离线在本地阅读&#xff0c;如用VS.Php for Visual Studio。这里面的内容安排的井然有序。不同用途的文件都放在了不同的文件夹…

postforobject 设置代理_Spring RestTemplate和代理身份验证

小编典典经过许多不同的选择之后&#xff0c;由于能够在创建时为RestTemplate设置代理&#xff0c;因此我可以选择以下代码&#xff0c;因此我可以将其重构为单独的方法。只是要注意&#xff0c;它还具有其他依赖性&#xff0c;因此请记住这一点。private RestTemplate createR…

LeetCode 1992. 找到所有的农场组(BFS)

文章目录1. 题目2. 解题1. 题目 给你一个下标从 0 开始&#xff0c;大小为 m x n 的二进制矩阵 land &#xff0c;其中 0 表示一单位的森林土地&#xff0c;1 表示一单位的农场土地。 为了让农场保持有序&#xff0c;农场土地之间以矩形的 农场组 的形式存在。 每一个农场组都…

flash html5 chrome,为了支持 HTML5 ,谷歌 Chrome 浏览器将“封杀”Flash

关于谷歌 Chrome 将不再支持 Flash 的话题&#xff0c;已经争论了很长一段时间了。不过现在&#xff0c;这家互联网巨头终于确认&#xff0c;为了支持 HTML5&#xff0c;Chrome 浏览器将不再运行 Flash。据谷歌表示&#xff0c;由于 Flash 会减慢网页浏览速度并可能引发信息安全…

VS2015开发Android,自带模拟器无法调试、加载程序,算是坑吗

VS2015出来后&#xff0c;确定变化很大&#xff0c;什么android、ios的&#xff0c;不在话下。对于我这样传统型的人&#xff0c;也第一时间试用了一下&#xff08;vs2003->vs2008->vs2012->vs2015&#xff09;。以前用eclipse开发过android小程序&#xff0c;现在想试…

amd cpu 安卓模拟器_夜神模拟器常见问题解答_v20201025

MAC版本常见问题Mac版本模拟器常见问题MAC模拟器出现“您应该将它移到废纸篓”解决办法MAC版本模拟器功能介绍MAC版本模拟器界面Mac版模拟器可能无法启动(卡99%)的原因及解决方式其他问题如何在夜神模拟器中安装自己想要的游戏/应用Sample CA 2证书没网解决办法GlobalSignature…

LeetCode 502. IPO(优先队列)

文章目录1. 题目2. 解题1. 题目 假设 力扣&#xff08;LeetCode&#xff09;即将开始 IPO 。 为了以更高的价格将股票卖给风险投资公司&#xff0c;力扣 希望在 IPO 之前开展一些项目以增加其资本。 由于资源有限&#xff0c;它只能在 IPO 之前完成最多 k 个不同的项目。 帮助…

计算机水平考试ppt试卷,计算机等级考试一级笔试卷库.ppt

计算机等级考试一级笔试卷库计算机等级考试一级笔试题库 全国计算机等级考试一级题库 1&#xff0e;微机中1K字节表示的二进制位数是( )。? A、1000 B、8x 1000 C、1024 D、8x1024?? 2&#xff0e;计算机硬件能直接识别和执行的只有( )。 A、高级语言 B、符号语言 C、汇编语…

mysql中ibatis的limit动态传参

param.put("pageNo",pageNo); param.put("pageSize",pageSize); sqlMap中的用法 limit ${pageNo},${pageSize} 我今天在敲代码的时候这么用结果就报错了:limit #pageNo#,#pageSize# 后来查资料才知道 limit 后面不应该用#&#xff08;我传进来的是Intege…

牛客网软通动力软件测试机试_软通动力软件测试笔试题

.;.1&#xff0e;软件测试是软件开发的重要环节&#xff0c;进行软件测试的目的是()A)证明软件错误不存在B)证明软件错误的存在C)改正程序所有的错误D)发现程序所有的错误2&#xff0e;对于软件质量描述不正确的是&#xff1a;()A)高质量的过程产生高质量的产品B)软件质量是测试…

公安计算机技能测试题库,2018公安文职考试题库:行政职业能力测验

【导语】中公重庆招警考试网为大家带来公安文职考试练习题&#xff1a;行政职业能力测验&#xff0c;帮助各位考生顺利备考公安文职考试笔试。1.美国中央情报局和国家保安局根据计算机专家提供的数据&#xff0c;已充分认识到军用电子系统的一个致命弱点——容易遭到计算机病毒…

OpenGL于MFC使用汇总(三)——离屏渲染

有时直接创建OpenGL形式不适合&#xff0c;或者干脆不同意然后创建一个表单&#xff0c;正如我现在这个项目&#xff0c;创建窗体不显示&#xff0c;它仅限于主框架。而我只是ActiveX里做一些相关工作&#xff0c;那仅仅能用到OpenGL的离屏渲染技术了~即不直接绘制到窗体上&…

python 动态属性和特性

文章目录1. 使用动态属性转换数据2. property2.1 help() 文档3. 特性工厂函数4. 属性删除操作5. 处理属性的重要属性和函数5.1 处理属性的内置函数5.2 处理属性的特殊方法learn from 《流畅的python》 1. 使用动态属性转换数据 在 Python 中&#xff0c;数据的属性和处理数据…

bash 不是内部或外部命令_Win10_cmd下提示:‘xxx’不是内部或外部命令,也不是可运行的程序 或批处理文件...

一、windows下命令行查看ip: win+R,调出【运行】,输入cmd,回车打开终端,输入ipconfig,查看成功: 二、可能出现的问题: ping 不是内部或外部命令,也不是可运行的程序或批处理文件。 ipconfig 不是内部或外部命令,也不是可运行的程序或批处理文件。 解决: 1、控制面板…