文章目录
- 1. 线程与协程对比
- 2. 使用 asyncio 和 aiohttp 下载
- 3. 避免阻塞型调用
- 4. 使用 asyncio.as_completed
- 5. 使用Executor对象,防止阻塞事件循环
- 6. 从回调到期物和协程
learn from 《流畅的python》
1. 线程与协程对比
threading
import threading
import itertools
import time
import sysclass Signal:go = Truedef spin(msg, signal):write, flush = sys.stdout.write, sys.stdout.flushfor char in itertools.cycle("|/-\\"): # 无限循环status = char + ' ' + msgwrite(status)flush()write("\x08" * len(status)) # \x08 退格键,光标移动回去time.sleep(0.1)if not signal.go:breakwrite(' ' * len(status) + "\x08" * len(status))# 使用空格清除状态消息,把光标移回开头def slow_function(): # 假设是一个耗时的计算过程time.sleep(10) # sleep 会阻塞主线程,释放GIL,创建从属线程return 42def supervisor(): # 该函数,设置从属线程,显示线程对象,运行耗时的计算,最后杀死线程signal = Signal()spinner = threading.Thread(target=spin, args=("thinking!", signal))print("spinner object:", spinner) # 显示从属线程对象spinner.start() # 启动从属线程result = slow_function() # 运行计算程序,阻塞主线程,从属线程动画显示旋转指针signal.go = False # 改变signal 状态,终止 spin 中的for循环spinner.join() # 等待spinner线程结束return resultdef main():result = supervisor() # 运行 supervisorprint("Answer:", result)if __name__ == '__main__':main()
适合 asyncio
的协程要由调用方驱动,并由调用方通过 yield from
调用(语法过时了,新版的用 async / await )
或者把协程传给 asyncio
包中的某个函数
一篇博文参考:https://www.cnblogs.com/dhcn/p/9032461.html
import asyncio
import itertools
import sys# https://docs.python.org/3.8/library/asyncio.html
async def spin(msg): # py3.5以后的新语法 async / await,协程函数write, flush = sys.stdout.write, sys.stdout.flushfor char in itertools.cycle("|/-\\"): # 无限循环status = char + ' ' + msgwrite(status)flush()write("\x08" * len(status)) # \x08 退格键,光标移动回去try:await asyncio.sleep(0.1)except asyncio.CancelledError: # 遇到取消异常,退出循环print("cancel")breakwrite(' ' * len(status) + "\x08" * len(status))print("end spin")async def slow_function(): # 协程函数print("start IO")await asyncio.sleep(3) # 假装进行 IO 操作print("end IO ")return 42async def supervisor(): # 协程函数spinner = asyncio.ensure_future(spin("thinking!")) # spinner 排定任务print("spinner object:", spinner) # 显示从属线程对象# spinner object: <Task pending coro=<spin() running at D:\ >print("start slow")result = await slow_function()print("end slow")spinner.cancel() # task对象可以取消,抛出CancelledError异常return resultdef main():loop = asyncio.get_event_loop() # 获取事件循环的引用result = loop.run_until_complete(supervisor()) # 驱动 supervisor 协程,让它运行完毕loop.close()print("answer:", result)if __name__ == '__main__':main()
输出:
spinner object: <Task pending coro=<spin() running at D:\gitcode >
start slow
start IO
end IO ng!(期间thinking!在输出,后来被覆盖)
end slow
cancel
end spin
answer: 42请按任意键继续. . .
2. 使用 asyncio 和 aiohttp 下载
import time
import sys
import os
import asyncio
import aiohttpPOP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = './'def save_flag(img, filename): # 保存图像path = os.path.join(DEST_DIR, filename)with open(path, 'wb') as fp:fp.write(img)def show(text): # 打印信息print(text, end=' ')sys.stdout.flush()async def get_flag(cc): # 获取图像url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())async with aiohttp.request("GET", url) as resp:image = await resp.read()return imageasync def download_one(cc):image = await get_flag(cc)show(cc)save_flag(image, cc.lower() + '.gif')return ccdef download_many_(cc_list):loop = asyncio.get_event_loop()todo = [download_one(cc) for cc in sorted(cc_list)] # 协程对象wait_coro = asyncio.wait(todo) # 包装成 task,wait是协程函数,返回协程或者生成器对象res, _ = loop.run_until_complete(wait_coro)# 驱动协程,返回 第一个元素是一系列结束的期物,第二个元素是一系列未结束的期物# loop.close(),好像不需要这句 上面 with 处可能自动关闭了return len(res)def main(download_many):t0 = time.time()count = download_many(POP20_CC)elapsed = time.time() - t0msg = '\n{} flags downloaded in {:.2f}s'print(msg.format(count, elapsed)) # 计时信息if __name__ == '__main__':main(download_many_)# US RU ID ET BR FR CN PH BD NG DE JP EG TR MX IN PK IR CD VN
# 20 flags downloaded in 3.88s
3. 避免阻塞型调用
执行硬盘
或网络 I/O
操作的函数定义为 阻塞型函数
有两种方法能 避免阻塞型调用 中止整个应用程序 的进程:
- 在单独的线程中运行各个阻塞型操作
- 把每个阻塞型操作 转换成非阻塞的异步调用 使用
4. 使用 asyncio.as_completed
import collections
import time
import sys
import os
import asyncio
from http import HTTPStatusimport aiohttp
from aiohttp import web
import tqdmPOP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = './'
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000class FetchError(Exception):def __init__(self, country_code):self.country_code = country_codedef save_flag(img, filename): # 保存图像path = os.path.join(DEST_DIR, filename)with open(path, 'wb') as fp:fp.write(img)def show(text): # 打印信息print(text, end=' ')sys.stdout.flush()async def get_flag(cc): # 获取图像url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())async with aiohttp.request("GET", url) as resp:if resp.status == 200:image = await resp.read()return imageelif resp.status == 404:raise web.HTTPNotFound()else:raise aiohttp.WebSocketError(code=resp.status, message=resp.reason)async def download_one(cc, semaphore, verbose):try:async with semaphore:image = await get_flag(cc)except web.HTTPNotFound:status = HTTPStatus.NOT_FOUNDmsg = "not found"except Exception as exc:raise FetchError(cc) from excelse:save_flag(image, cc.lower() + '.gif')status = HTTPStatus.OKmsg = "OK"if verbose and msg:print(cc, msg)return (status, cc)async def downloader_coro(cc_list, verbose, concur_req): # 协程函数counter = collections.Counter()semaphore = asyncio.Semaphore(value=concur_req) # 最多可以使用这个计数器的协程个数todo = [download_one(cc, semaphore, verbose=True) for cc in sorted(cc_list)] # 协程对象列表todo_iter = asyncio.as_completed(todo) # 获取迭代器,会在期物运行结束后返回期物if not verbose:todo_iter = tqdm.tqdm(todo_iter, total=len(cc_list)) # 迭代器传给tqdm,显示进度条for future in todo_iter: # 迭代器运行结束的期物try:res = await future # 获取期物对象的结果except FetchError as exc:country_code = exc.country_codetry:error_msg = exc.__cause__.args[0]except IndexError:error_msg = exc.__cause__.__class__.__name__if verbose and error_msg:msg = '*** Error for {}: {}'print(msg.format(country_code, error_msg))status = HTTPStatus.errorelse:status = res[0]counter[status] += 1 # 记录结果return counter # 返回计数器def download_many_(cc_list, verbose, concur_req):loop = asyncio.get_event_loop()coro = downloader_coro(cc_list, verbose=verbose, concur_req=concur_req)# 实例化 downloader_coro协程,然后通过 run_until_complete 方法把它传给事件循环counts = loop.run_until_complete(coro)# loop.close() # 好像不需要这句 上面 with 处可能自动关闭了return countsdef main(download_many):t0 = time.time()count = download_many(POP20_CC, True, MAX_CONCUR_REQ)elapsed = time.time() - t0msg = '\n{} flags downloaded in {:.2f}s'print(msg.format(count, elapsed)) # 计时信息if __name__ == '__main__':main(download_many_)
5. 使用Executor对象,防止阻塞事件循环
loop.run_in_executor
方法把阻塞的作业(例如保存文件)委托给线程池做
async def download_one(cc, semaphore, verbose):try:async with semaphore:image = await get_flag(cc)except web.HTTPNotFound:status = HTTPStatus.NOT_FOUNDmsg = "not found"except Exception as exc:raise FetchError(cc) from excelse:# 因此保存文件时,整个应用程序都会冻结,为了避免,使用下面方法loop = asyncio.get_event_loop() # 获取事件循环对象的引用loop.run_in_executor(None, # 方法的第一个参数是 Executor 实例;# 如果设为 None,使用事件循环的默认 ThreadPoolExecutor 实例save_flag, image, cc.lower() + ".gif")# 余下的参数是可调用的对象,以及可调用对象的位置参数status = HTTPStatus.OKmsg = "OK"if verbose and msg:print(cc, msg)return (status, cc)
6. 从回调到期物和协程
- 如果一个操作需要依赖之前操作的结果,那就得嵌套回调
def stage1(response1):request2 = step1(response1)api_call2(request2, stage2)def stage2(response2):request3 = step2(response2)api_call3(request3, stage3)def stage3(response3):tep3(response3)api_call1(request1, stage1)
好的写法:
async def three_stages(request1): response1 = await api_call1(request1) # 第一步 request2 = step1(response1) response2 = await api_call2(request2) # 第二步 request3 = step2(response2) response3 = await api_call3(request3)# 第三步 step3(response3)
loop.create_task(three_stages(request1)) # 必须显式调度执行
协程 必须使用 事件循环 显式排定 协程的执行时间
异步系统 能 避免用户级线程的开销,这是它能比多线程系统管理更多并发连接的主要原因