一、核心要义
1. 对比一个简答的多线程程序和对应的asyncio版,说明多线程和异步任务之间的关系
2. 网络下载的异步版
3. 在异步编程中,与回调相比,协程显著提升性能的方式
二、代码示例
1、相关知识点
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/10 10:14
# @Author : Maple
# @File : 00-相关知识点.py
# @Software: PyCharm
import sysimport collectionsif __name__ == '__main__':write, flush = sys.stdout.write, sys.stdout.flushwrite('Hello world')flush()# \x08是回退符write('\x08' * 1) # Hello worlwrite('\n')write('Hello world')flush()write('\x08' * 2) # Hello wowrite('\n')write('Hello world')flush()# 如果回退长度为字符串本身长度,相当于全部被清空write('\x08' * len('Hello world'))
2、指针旋转(多线程示例)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/8 22:05
# @Author : Maple
# @File : 01-指针旋转(多线程示例).py
# @Software: PyCharm
import itertools
import sys
import threading
import timeclass Signal:go = Truedef spin(msg,signal):write,flush = sys.stdout.write,sys.stdout.flushfor char in itertools.cycle('|/-\\'):status = char + ' ' + msgwrite(status)flush()write('\x08' * len(status))time.sleep(.1)if not signal.go:breakwrite(' ' * len(status) + '\x08' * len(status))def slow_function():# 假装等待IO一段时间time.sleep(3)return 42def supervisor():signal = Signal()spinner = threading.Thread(target= spin, args=('thinking',signal))# 输出从属线程对象,类似:<Thread(thread-1,initial)>print('spinner object',spinner)spinner.start()result = slow_function()signal.go = False# 等待spinner线程结束spinner.join()return resultdef main():result = supervisor()print('Answer:',result)if __name__ == '__main__':main()
3、指针旋转(asyncio示例)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/10 11:20
# @Author : Maple
# @File : 02-指针旋转(asyncio示例).py
# @Software: PyCharm
import asyncio
import itertools
import sys# 打算交给asyncio处理的协程要使用该装饰器(Python3.8及之后的版本中已经过时了,建议使用async代替,详见指针旋转(asyncio示例2)),并非强制,但建议这么做@asyncio.coroutine
def spin(msg):write,flush = sys.stdout.write,sys.stdout.flushfor char in itertools.cycle('|/-\\'):status = char + ' ' + msgwrite(status)flush()write('\x08' * len(status))try:# 代替time.sleep,这样的休眠不会阻塞事件循环yield from asyncio.sleep(.1)except asyncio.CancelledError:breakwrite(' ' * len(status) + '\x08' * len(status))@asyncio.coroutine
def slow_function():# 假装等待I/0一段事件yield from asyncio.sleep(3)return 42@asyncio.coroutine
def supervisor():# 使用async函数排定spin协程运行时间,使用一个Task对象包装spin协程,并立即返回# Task对象也是一个Future对象,因为Task类是future类的一个子类spinner = asyncio.create_task(spin('thinking'))print('spinner object',spinner)# 驱动slow_function函数,结束后获取返回值(参考16章-协程中的`委派生成器`一节)"""第一级委派生成器:supervisor第二级委派生成器:slow_function"""# 同时事件循环继续运行,因为slow_function函数最后使用yield from asyncio.sleep(3)# 表达式控制权交给了主线程result = yield from slow_function()spinner.cancel()return resultdef main():# 获取事件循环的引用loop = asyncio.get_event_loop()# 驱动supervisor协程,让它运行完毕,这个协程的返回值是这次调用的返回值# 事件循环就是外部调用方(参考16章-协程中的`委派生成器`一节)result = loop.run_until_complete(supervisor())loop.close()print('Answer',result)if __name__ == '__main__':main()
4、指针旋转(asyncio示例2)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/10 11:20
# @Author : Maple
# @File : 02-指针旋转(asyncio示例).py
# @Software: PyCharm
import asyncio
import itertools
import sys# 由于@asyncio.coroutine装饰器在Python3.8及以后的版本中已经过时
# Python官方建议使用async代替该装饰器async def spin(msg):write,flush = sys.stdout.write,sys.stdout.flushfor char in itertools.cycle('|/-\\'):status = char + ' ' + msgwrite(status)flush()write('\x08' * len(status))try:# 代替time.sleep,这样的休眠不会阻塞事件循环await asyncio.sleep(.1)except asyncio.CancelledError:breakwrite(' ' * len(status) + '\x08' * len(status))async def slow_function():# 假装等待I/0一段事件await asyncio.sleep(3)return 42async def supervisor():# 使用async函数排定spin协程运行时间,使用一个Task对象包装spin协程,并立即返回spinner = asyncio.create_task(spin('thinking'))print('spinner object:',spinner) # spinner object: <Task pending name='Task-2' coro=<spin() running at D:/01-study/python/fluent_python/18-使用asyncio包处理并发/03-指针旋转(asyncio示例2).py:14>>print('spinner object type:',type(spinner)) # <class '_asyncio.Task'># 驱动slow_function函数,结束后获取返回值。同时事件循环继续运行,因为slow_function函数最后使用yield from asyncio.sleep(3)# 表达式控制权交给了主线程result = await slow_function()spinner.cancel()return resultdef main():loop = asyncio.get_event_loop()result = loop.run_until_complete(supervisor())loop.close()print('Answer',result)if __name__ == '__main__':main()
5、网络下载asyncio版本(1)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/10 12:29
# @Author : Maple
# @File : 04-网络下载asyncio版本(1).py
# @Software: PyCharm
import asyncio
import time
from random import randint"""
我们编程的协程链条:
1. 通过把最外层的委派生成器传给asyncio包API中的某个函数(如loop.run_until_complete驱动)即不通过调用next()或send()[对比参考16章-协程中的`委派生成器`一节]驱动协程
2. 我们编写的协程链条最终通过yield from把职责委托给asyncio包的某个协程函数或协程方法(比如本例中的asyncio.sleep(download_time))也就是说,最内层的`子生成器`是库中真正执行的I/O操作的函数,而不是我们自己编写的函数
3. 通常来说,我们编写的只是`委派生成器`
"""CC_LIST = [1, 2, 3, 4, 5, 6]def get_randint():return randint(1, 10)async def get_img(id):print('***', id, '号图片开始下载***')download_time = get_randint()await asyncio.sleep(download_time)return download_timeasync def download_one_img(id):result = await get_img(id)print('***{}号图片下载完成,花费时长{}s***'.format(id, result))return str(id) + '号图片内容'def download_many_img(cc_list):# 获取事件循环的引用loop = asyncio.get_event_loop()# 构建协程对象列表to_do = [download_one_img(c) for c in sorted(cc_list)]# wait方法的参数是future对象或者协程构成的可迭代对象,该方法会把各个协程包装进一个Task对象(也是Future对象,因为Task类是# Future类的子类)wait_coro = asyncio.wait(to_do)# 执行事件循环,直到wait_coro运行结束,事件循环运行的过程中,程序会在这里阻塞.# 返回的第一个元素是一系列结束的future,第二个是一系列未结束的future(此例始终为空,所以赋值给_, wait有两个关键字,如果设置了可# 能会返回未结束的future -- timeout和return_when)res,_ = loop.run_until_complete(wait_coro)loop.close()return len(res)def main(download_many_img):t0 = time.time()count = download_many_img(CC_LIST)elapsed = time.time() - t0msg = '\n{} flags downloaded in {:.2f}s'print(msg.format(count, elapsed))if __name__ == '__main__':main(download_many_img)
6、网络下载asyncio版本(2)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/10 17:50
# @Author : Maple
# @File : 05-网路下载asyncio版本(2).py
# @Software: PyCharm"""
使用asyncio.as_completed函数,由于
1. 该函数必须在协程中使用
2. download_many_img 不能是协程,因为main函数中需要调用download_many_img,而该参数是普通函数因此,需要新增一个 downloader_coro 协程函数,专门用于多线程(协程)下载,而 download_many_img仅用于设置事件循环
"""
import asyncio
import time
from random import randintCC_LIST = [1, 2, 3, 4, 5, 6]def get_randint():return randint(1, 10)async def get_img(id):print('***', id, '号图片开始下载***')download_time = get_randint()await asyncio.sleep(download_time)return download_timeasync def download_one_img(id):result = await get_img(id)print('***{}号图片下载完成,花费时长{}s***'.format(id, result))return str(id) + '号图片内容'async def downloader_coro(cc_list):count = 0# 生成协程列表to_do = [download_one_img(c) for c in sorted(cc_list)]# 获取包装式Future对象to_do_iter = asyncio.as_completed(to_do)for future in to_do_iter:# 获取future(里面执行的是download_one_img函数)返回结果res = await futurecount +=1print(res)return countdef download_many_img(cc_list):# 获取事件循环的引用loop = asyncio.get_event_loop()# 实例化协程coro = downloader_coro(cc_list)res = loop.run_until_complete(coro)loop.close()return resdef main(download_many_img):t0 = time.time()count = download_many_img(CC_LIST)elapsed = time.time() - t0msg = '\n{} flags downloaded in {:.2f}s'print(msg.format(count, elapsed))if __name__ == '__main__':main(download_many_img)
7、一次性多次请求
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/10 20:14
# @Author : Maple
# @File : 06-每次多次请求.py
# @Software: PyCharm
import asyncio
import json
import re
from asgiref.sync import sync_to_asyncdef regex_data(data):""":param data:异步读取返回的原始数据,Sample数据如下:({<Task finished name='Task-2' coro=<get_info() done, defined at D:/01-study/python/fluent_python/18-使用asyncio包处理并发/06-每次多次请求.py:47> result='China'>}, set()):return:提取原始数据中的result部分"""regex = re.compile('result=(.*?)')result = regex.findall(str(data))return resultdef read_json(country):with open('data/name.json', 'r') as f:# returns JSON object as a dictionarydata = json.load(f)return data[country]async def read_json_async(country_code):try:data = await sync_to_async(read_json)(country_code)except Exception as e:data = 'code不存在'return datadef read_img(path):with open(path, 'rb') as f:data = f.read()return data# f.read二级制文件 不能直接使用await
async def read_img_async(path):# 同步函数(必须有参数,否则报错,暂时未研究原因)转异步try:data = await sync_to_async(read_img)(path)except Exception as e:data = 'code不存在'return dataasync def get_info(type,code=None,path=None):# 如果传入json,就返回姓名信息if 'json' == type:data = await read_json_async(code)elif 'img' == type:# 否则返回图片信息# 直接返回的数据Sample: {<Task finished name='Task-2' coro=<read_img_sync() done, defined at D:/01-study/python/fluent_python/18-使用asyncio包处理并发/test.py:64># result=b'\xff\xd8\xf...8\x8c\xf6\x9f'>}data = await read_img_async(path)return dataasync def get_names(code,path):darling = await get_info('json',code=code)img_data = await get_info('img',path=path)return (darling,img_data)def download_many(code=None,path=None):loop = asyncio.get_event_loop()to_do = [get_names(code='0001', path ='data/profile_photo.jpg'), get_names(code='0002', path ='data/profile_photo.jpg')]wait_coro = asyncio.wait(to_do)# ({<Task finished name='Task-2' coro=<get_info() done, defined at D:/01-study/python/fluent_python/18-使用asyncio包处理并发/06-每次多次请求.py:47> result='China'>}, set())datas = loop.run_until_complete(wait_coro)print(datas)for i in range(len(datas)):# 解析数据if datas[i]:data = regex_data(datas[i])[0]print(data)loop.close()def main(download_many):data = download_many()if __name__ == '__main__':main(download_many)